You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ravidspark <ra...@gmail.com> on 2018/10/27 00:10:01 UTC

Is spark not good for ingesting into updatable databases?

Hi All,

My problem is as explained,

Environment: Spark 2.2.0 installed on CDH
Use-Case: Reading from Kafka, cleansing the data and ingesting into a non
updatable database.

Problem: My streaming batch duration is 1 minute and I am receiving 3000
messages/min. I am observing a weird case where, in the map transformations
some of the messages are being reprocessed more than once to the downstream
transformations. Because of this I have been seeing duplicates in the
downstream insert only database.

It would have made sense if the reprocessing of the message happens for the
entire task in which case I would have assumed the problem is because of the
task failure. But, in my case I don't see any task failures and only one or
two particular messages in the task will be reprocessed. 

Everytime I relaunch the spark job to process kafka messages from the
starting offset, it would dup the exact same messages all the time
irrespective of number of relaunches.

I added the messages that are getting duped back to kafka at a different
offset to see if I would observe the same problem, but this time it won't
dup.

Workaround for now: 
As a workaround for now, I added a cache at the end before ingestion into DB
which gets updated processed event and thus making sure it won't be
reprocessed again.


My question here is, why am I seeing this weird behavior(only one particular
message in the entire batch getting reprocessed again)? Is there some
configuration that would help me fix this problem or is this a bug? 

Any solution apart from maintaining a cache would be of great help.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Is spark not good for ingesting into updatable databases?

Posted by ravidspark <ra...@gmail.com>.
Hi Jorn,

Just want to check if you got a chance to look at this problem. I couldn't
figure out any reason on why this is happening. Any help would be
appreciated.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Is spark not good for ingesting into updatable databases?

Posted by ravidspark <ra...@gmail.com>.
Hi Jorn,

Thanks for your kind reply. I do accept that there might be something in the
code. Any help would be appreciated. 

To give you some insights, I checked the source of the message in kafka if
it has been repeated twice. But, I could only find it once. Also, it would
have been convincing if all the messages are duplicated instead of only few.
Please find below my source code & also a snapshot of the message that is
getting duplicated in the entire logs:


		<kafka stream>

		JavaDStream<Map&lt;String, Object>> prePepForMappedJsonStream =
stream.map(new Function<String, Map&lt;String, Object>>() {
		
			Map<String, Object> mappedJson = null;
			@Override
			public Map<String, Object> call(String inputJsonMessage) {
				try {
					if(StringUtils.length(inputJsonMessage) != 2) {
						mappedJson = new HashMap<>();
						StopWatch watch = StopWatchSingleton.instance();
						watch.reset();watch.start();
						logger.info("Transformation-1 Start:{} & Input Message is: {}",
LocalDateTime.now(),inputJsonMessage);
						JsonToMapPrePepTransformer instance = new
JsonToMapPrePepTransformer(StringUtils.join(store.getName().toLowerCase(),"_yellowbrick"));
						mappedJson = instance.transformJsonToMap(inputJsonMessage);
						watch.stop();
						logger.info("Transformation-1 End:{}, Elapsed:{} & OutputMessage is:
{}", LocalDateTime.now(), watch.getTime(), mappedJson);
					}
				} catch (Exception e) {
						logger.error("",e);
					}
				}
				return mappedJson;
			}			
		});
		
		JavaDStream<Map&lt;String, Object>> transformedStream =
prePepForMappedJsonStream.map(new Function<Map&lt;String,Object>,
Map<String, Object>>() {
			
			Map<String, Object> resultMap = null;
			@Override
			public Map<String, Object> call(Map<String, Object> readyToTransformMap)
throws Exception {
				if(readyToTransformMap != null) {
					StopWatch watch = StopWatchSingleton.instance();
					watch.reset();watch.start();
					logger.info("Transformation-2 Start:{} & Input Message is: {}",
LocalDateTime.now(),readyToTransformMap);
					resultMap = new HashMap<>();
					resultMap =
YBEDFormatter.init(StringUtils.join(store.getName().toLowerCase(),"_yellowbrick"),
readyToTransformMap);
					watch.stop();
					logger.info("Transformation-2 End:{}, Elapsed:{} & OutputMessage is:
{}", LocalDateTime.now(), watch.getTime(), resultMap);
				}
				return resultMap;
			}
		});
		
		JavaDStream<ResultMapHolder> kafkaPreIngestStream =
transformedStream.map(new Function<Map&lt;String,Object>, ResultMapHolder>()
{
			
			ResultMapHolder resultMapBean = null;
			@Override
			public ResultMapHolder call(Map<String, Object> finalTransformedMap)
throws Exception {
				try {
					if(finalTransformedMap != null) {
						StopWatch watch = StopWatchSingleton.instance();
						watch.reset();watch.start();
						logger.info("Transformation-3 Start:{} & Input Message is: {}",
LocalDateTime.now(),finalTransformedMap);
						resultMapBean =
MapToArrayTransformerForYBIngestion.instance().transformMapToOrderedArrayOfValues(finalTransformedMap,
tableColumns);
						watch.stop();
						logger.info("Transformation-3 End:{}, Elapsed:{} & OutputMessage is:
{}", LocalDateTime.now(), watch.getTime(),
Arrays.toString(resultMapBean.getOutputRow()));
					}
				} catch (Exception e) {
						logger.error("",e);
				}
				return resultMapBean;
			}
		});
		
	

Please observe the loggers in the above code. I grepped through the entire
logs across all the executors and found the record that is repeating. Please
find the transformations of that message from logs:


18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-1
Start:2018-10-26T20:36:18.975 & Input Message is:
{"request_id":"7cad0cb2a7bf427481a83639f82ffcec"}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-1
End:2018-10-26T20:36:18.977, Elapsed:2 & OutputMessage is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2
Start:2018-10-26T20:36:18.978 & Input Message is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2
End:2018-10-26T20:36:18.981, Elapsed:3 & OutputMessage is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-3
Start:2018-10-26T20:36:18.981 & Input Message is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-3
End:2018-10-26T20:36:18.982, Elapsed:0 & OutputMessage is:
[7cad0cb2a7bf427481a83639f82ffcec]
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2
Start:2018-10-26T20:36:18.983 & Input Message is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2
End:2018-10-26T20:36:18.990, Elapsed:1 & OutputMessage is:
[7cad0cb2a7bf427481a83639f82ffcec]
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2
Start:2018-10-26T20:36:18.991 & Input Message is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-2
End:2018-10-26T20:36:18.995, Elapsed:3 & OutputMessage is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-3
Start:2018-10-26T20:36:18.995 & Input Message is:
{request_id=7cad0cb2a7bf427481a83639f82ffcec}
18/10/26 20:36:18 INFO sparkybstreaming.YBIngestionApp: Transformation-3
End:2018-10-26T20:36:18.996, Elapsed:1 & OutputMessage is:
[7cad0cb2a7bf427481a83639f82ffcec]


Please note that, this is not happening to all the records. Example, in QA
where I am testing out of 16000 messages the above single record is
duplicated twice(Kafka has only one message) and the rest are present only
once in DB. 

Let me know in case you need any thing else. Once again, thanks for looking
in.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Is spark not good for ingesting into updatable databases?

Posted by Jörn Franke <jo...@gmail.com>.
Do you have some code that you can share?

Maybe it is something in your code that unintentionally duplicates it?

Maybe your source (eg the application putting it on Kafka?)duplicates them already?
Once and only once processing needs to be done end to end.

> Am 27.10.2018 um 02:10 schrieb ravidspark <ra...@gmail.com>:
> 
> Hi All,
> 
> My problem is as explained,
> 
> Environment: Spark 2.2.0 installed on CDH
> Use-Case: Reading from Kafka, cleansing the data and ingesting into a non
> updatable database.
> 
> Problem: My streaming batch duration is 1 minute and I am receiving 3000
> messages/min. I am observing a weird case where, in the map transformations
> some of the messages are being reprocessed more than once to the downstream
> transformations. Because of this I have been seeing duplicates in the
> downstream insert only database.
> 
> It would have made sense if the reprocessing of the message happens for the
> entire task in which case I would have assumed the problem is because of the
> task failure. But, in my case I don't see any task failures and only one or
> two particular messages in the task will be reprocessed. 
> 
> Everytime I relaunch the spark job to process kafka messages from the
> starting offset, it would dup the exact same messages all the time
> irrespective of number of relaunches.
> 
> I added the messages that are getting duped back to kafka at a different
> offset to see if I would observe the same problem, but this time it won't
> dup.
> 
> Workaround for now: 
> As a workaround for now, I added a cache at the end before ingestion into DB
> which gets updated processed event and thus making sure it won't be
> reprocessed again.
> 
> 
> My question here is, why am I seeing this weird behavior(only one particular
> message in the entire batch getting reprocessed again)? Is there some
> configuration that would help me fix this problem or is this a bug? 
> 
> Any solution apart from maintaining a cache would be of great help.
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> 

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org