You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Abhijeet Kumar <ab...@sentienz.com> on 2018/11/23 06:54:53 UTC

error while joining two datastream

Hello Team,

I'm new to Flink and coming from Spark background. I need help in completing this stream job. I'm reading data from two different Kafka topics and I want to join them.

My code:

formatStream1.join(formatStream2)
		.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
			public String getKey(Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1) throws Exception {
				return t1.f0;
			}
		})
		.equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
			public String getKey(Tuple7<String, String, String, String, String, String, Long> t1) throws Exception {
				return t1.f0;
			}
		}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
		.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {

					public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
							Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
							Tuple7<String, String, String, String, String, String, Long> second) {
						return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
					}
				}).print();


Error:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
	at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
	at com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
	at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

In formatStream1 and formatStream2 variable data is coming I checked by printing them. So, the issue is in the code which I shared. Thanks in advance!!!

Thanks,

	
Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !
abhijeet.kumar@sentienz.com <ma...@sentienz.com> |www.sentienz.com <http://www.sentienz.com/> | Bengaluru



Re: error while joining two datastream

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

I assume that 

		withTimestampsAndWatermarks1.print();
		withTimestampsAndWatermarks2.print();

Actually prints what you have expected? 

If so, the problem might be that:
a) time/watermarks are not progressing (watermarks are triggering the output of your `TumblingEventTimeWindows.of(Time.seconds(15))`)
b) data are not being joined, because:
  - there are no matching elements (based on your KeySelectors) to join with between those two streams
  - elements are out of sync with respect to window length (within your 15 second tumbling window, there are no elements to join)
c) streams are producing different event times/watermarks (for example one is far ahead of the other). Windowed join will produce result only once their’s both watermarks catch up/sync up.
  
Piotrek 

> On 23 Nov 2018, at 08:50, Abhijeet Kumar <ab...@sentienz.com> wrote:
> 
> DataStream<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>> withTimestampsAndWatermarks1 = formatStream1
> 				.assignTimestampsAndWatermarks(
> 						new BoundedOutOfOrdernessTimestampExtractor<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>>(
> 								Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {
> 
> 							/**
> 									 * 
> 									 */
> 									private static final long serialVersionUID = 1L;
> 
> 							@Override
> 							public long extractTimestamp(
> 									Tuple11<String, String, String, String, String, String, String, String, String, String, Long> element) {
> 								return element.f10;
> 							}
> 						});
> 
> 		DataStream<Tuple7<String, String, String, String, String, String, Long>> withTimestampsAndWatermarks2 = formatStream2
> 				.assignTimestampsAndWatermarks(
> 						new BoundedOutOfOrdernessTimestampExtractor<Tuple7<String, String, String, String, String, String, Long>>(
> 								Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {
> 
> 							/**
> 									 * 
> 									 */
> 									private static final long serialVersionUID = 1L;
> 
> 							@Override
> 							public long extractTimestamp(
> 									Tuple7<String, String, String, String, String, String, Long> element) {
> 								return element.f6;
> 							}
> 						});
> 		
> 		withTimestampsAndWatermarks1.print();
> 		withTimestampsAndWatermarks2.print();
> 		
> 		DataStream< Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>> joined = withTimestampsAndWatermarks1
> 				.join(withTimestampsAndWatermarks2)
> 				.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
> 					/**
> 					 * 
> 					 */
> 					private static final long serialVersionUID = 1L;
> 
> 					public String getKey(
> 							Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1)
> 							throws Exception {
> 						return t1.f0;
> 					}
> 				}).equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
> 					/**
> 					 * 
> 					 */
> 					private static final long serialVersionUID = 1L;
> 
> 					public String getKey(Tuple7<String, String, String, String, String, String, Long> t1)
> 							throws Exception {
> 						return t1.f0;
> 					}
> 				}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
> 				.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {
> 
> 					/**
> 					 * 
> 					 */
> 					private static final long serialVersionUID = 1L;
> 
> 					public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
> 							Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
> 							Tuple7<String, String, String, String, String, String, Long> second) {
> 						
> 						return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
> 					}
> 				});
> 		
> 		joined.print();
> 
> Ok, so now I did it like this. Errors resolved! but, now I'm not able to see any output when I'm printing joined datastream.
> 
>> On 23-Nov-2018, at 12:42 PM, Nagarjun Guraja <nagarjun@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Looks like you need to assign time stamps and emit watermarks to both the streams viz. formatStream1 and formatStream2 as described at 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html>
>> 
>> On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <abhijeet.kumar@sentienz.com <ma...@sentienz.com>> wrote:
>> Hello Team,
>> 
>> I'm new to Flink and coming from Spark background. I need help in completing this stream job. I'm reading data from two different Kafka topics and I want to join them.
>> 
>> My code:
>> 
>> formatStream1.join(formatStream2)
>> 		.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
>> 			public String getKey(Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1) throws Exception {
>> 				return t1.f0;
>> 			}
>> 		})
>> 		.equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
>> 			public String getKey(Tuple7<String, String, String, String, String, String, Long> t1) throws Exception {
>> 				return t1.f0;
>> 			}
>> 		}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
>> 		.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {
>> 
>> 					public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
>> 							Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
>> 							Tuple7<String, String, String, String, String, String, Long> second) {
>> 						return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
>> 					}
>> 				}).print();
>> 
>> 
>> Error:
>> 
>> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>> 	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> 	at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
>> 	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>> 	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
>> 	at com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
>> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
>> 	at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
>> 	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> 	at java.lang.Thread.run(Thread.java:748)
>> 
>> In formatStream1 and formatStream2 variable data is coming I checked by printing them. So, the issue is in the code which I shared. Thanks in advance!!!
>> 
>> Thanks,
>> 
>> 	
>> Abhijeet Kumar
>> Software Development Engineer,
>> Sentienz Solutions Pvt Ltd
>> Cognitive Data Platform - Perceive the Data !
>> abhijeet.kumar@sentienz.com <ma...@sentienz.com> |www.sentienz.com <http://www.sentienz.com/> | Bengaluru
>> 
>> -- 
>> Regards,
>> Nagarjun
>> 
>> Success is not final, failure is not fatal: it is the courage to continue that counts. 
>> - Winston Churchill - 
> 


Re: error while joining two datastream

Posted by Abhijeet Kumar <ab...@sentienz.com>.
DataStream<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>> withTimestampsAndWatermarks1 = formatStream1
				.assignTimestampsAndWatermarks(
						new BoundedOutOfOrdernessTimestampExtractor<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>>(
								Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {

							/**
									 * 
									 */
									private static final long serialVersionUID = 1L;

							@Override
							public long extractTimestamp(
									Tuple11<String, String, String, String, String, String, String, String, String, String, Long> element) {
								return element.f10;
							}
						});

		DataStream<Tuple7<String, String, String, String, String, String, Long>> withTimestampsAndWatermarks2 = formatStream2
				.assignTimestampsAndWatermarks(
						new BoundedOutOfOrdernessTimestampExtractor<Tuple7<String, String, String, String, String, String, Long>>(
								Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {

							/**
									 * 
									 */
									private static final long serialVersionUID = 1L;

							@Override
							public long extractTimestamp(
									Tuple7<String, String, String, String, String, String, Long> element) {
								return element.f6;
							}
						});
		
		withTimestampsAndWatermarks1.print();
		withTimestampsAndWatermarks2.print();
		
		DataStream< Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>> joined = withTimestampsAndWatermarks1
				.join(withTimestampsAndWatermarks2)
				.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
					/**
					 * 
					 */
					private static final long serialVersionUID = 1L;

					public String getKey(
							Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1)
							throws Exception {
						return t1.f0;
					}
				}).equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
					/**
					 * 
					 */
					private static final long serialVersionUID = 1L;

					public String getKey(Tuple7<String, String, String, String, String, String, Long> t1)
							throws Exception {
						return t1.f0;
					}
				}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
				.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {

					/**
					 * 
					 */
					private static final long serialVersionUID = 1L;

					public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
							Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
							Tuple7<String, String, String, String, String, String, Long> second) {
						
						return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
					}
				});
		
		joined.print();

Ok, so now I did it like this. Errors resolved! but, now I'm not able to see any output when I'm printing joined datastream.

> On 23-Nov-2018, at 12:42 PM, Nagarjun Guraja <na...@gmail.com> wrote:
> 
> Looks like you need to assign time stamps and emit watermarks to both the streams viz. formatStream1 and formatStream2 as described at 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html>
> 
> On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <abhijeet.kumar@sentienz.com <ma...@sentienz.com>> wrote:
> Hello Team,
> 
> I'm new to Flink and coming from Spark background. I need help in completing this stream job. I'm reading data from two different Kafka topics and I want to join them.
> 
> My code:
> 
> formatStream1.join(formatStream2)
> 		.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
> 			public String getKey(Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1) throws Exception {
> 				return t1.f0;
> 			}
> 		})
> 		.equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
> 			public String getKey(Tuple7<String, String, String, String, String, String, Long> t1) throws Exception {
> 				return t1.f0;
> 			}
> 		}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
> 		.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {
> 
> 					public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
> 							Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
> 							Tuple7<String, String, String, String, String, String, Long> second) {
> 						return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
> 					}
> 				}).print();
> 
> 
> Error:
> 
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> 	at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
> 	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> 	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
> 	at com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
> 	at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
> 	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 	at java.lang.Thread.run(Thread.java:748)
> 
> In formatStream1 and formatStream2 variable data is coming I checked by printing them. So, the issue is in the code which I shared. Thanks in advance!!!
> 
> Thanks,
> 
> 	
> Abhijeet Kumar
> Software Development Engineer,
> Sentienz Solutions Pvt Ltd
> Cognitive Data Platform - Perceive the Data !
> abhijeet.kumar@sentienz.com <ma...@sentienz.com> |www.sentienz.com <http://www.sentienz.com/> | Bengaluru
> 
> -- 
> Regards,
> Nagarjun
> 
> Success is not final, failure is not fatal: it is the courage to continue that counts. 
> - Winston Churchill - 


Re: error while joining two datastream

Posted by Nagarjun Guraja <na...@gmail.com>.
Looks like you need to assign time stamps and emit watermarks to both the
streams viz. formatStream1 and formatStream2 as described at
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html

On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <ab...@sentienz.com>
wrote:

> Hello Team,
>
> I'm new to Flink and coming from Spark background. I need help in
> completing this stream job. I'm reading data from two different Kafka
> topics and I want to join them.
>
> My code:
>
> formatStream1.join(formatStream2)
> .where(new KeySelector<Tuple11<String, String, String, String, String,
> String, String, String, String, String, Long>, String>() {
> public String getKey(Tuple11<String, String, String, String, String,
> String, String, String, String, String, Long> t1) throws Exception {
> return t1.f0;
> }
> })
> .equalTo(new KeySelector<Tuple7<String, String, String, String, String,
> String, Long>, String>() {
> public String getKey(Tuple7<String, String, String, String, String,
> String, Long> t1) throws Exception {
> return t1.f0;
> }
> }).window(TumblingEventTimeWindows.of(Time.seconds(15)))
> .apply(new JoinFunction<Tuple11<String, String, String, String, String,
> String, String, String, String, String, Long>, Tuple7<String, String,
> String, String, String, String, Long>, Tuple17<String, String, String,
> String, String, String, String, String, String, String, String, String,
> String, String, String, Long, Long>>() {
>
> public Tuple17<String, String, String, String, String, String, String,
> String, String, String, String, String, String, String, String, Long, Long>
> join(
> Tuple11<String, String, String, String, String, String, String, String,
> String, String, Long> first,
> Tuple7<String, String, String, String, String, String, Long> second) {
> return new Tuple17<String, String, String, String, String, String,
> String, String, String, String, String, String, String, String, String,
> Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5,
> first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3,
> second.f4, second.f5, second.f6, first.f10);
> }
> }).print();
>
>
> Error:
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
> at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(
> JobResult.java:146)
> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(
> MiniCluster.java:630)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(
> LocalStreamEnvironment.java:123)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(
> StreamExecutionEnvironment.java:1511)
> at com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
> timestamp (= no timestamp marker). Is the time characteristic set to
> 'ProcessingTime', or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
> at
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(
> TumblingEventTimeWindows.java:69)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(
> WindowOperator.java:295)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.java:202)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:105)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
>
> In formatStream1 and formatStream2 variable data is coming I checked by
> printing them. So, the issue is in the code which I shared. Thanks in
> advance!!!
>
> Thanks,
>
>
> *Abhijeet Kumar*
> Software Development Engineer,
> Sentienz Solutions Pvt Ltd
> Cognitive Data Platform - Perceive the Data !
> abhijeet.kumar@sentienz.com |www.sentienz.com | Bengaluru
>
> --
Regards,
Nagarjun

*Success is not final, failure is not fatal: it is the courage to continue
that counts. *
*- Winston Churchill - *