You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by ddelizia <da...@unic.com> on 2012/09/27 12:43:35 UTC

Best Strategy to process a csv from a given line

Hello,
I have to consume a csv (big file 14MB) and start to read it from a given
line number. The line number is stored in a file stored in a ftp, at the end
of the process I have to update the line number with the last line I read.
So my approach was unmarshalling the file with .unmarshal(csv) and create a
processor that:

1- Starts a route to download the file locally
2- Read the file and get the linenumber
3- loop on the object (List<List&lt;String>>) exchange.getIn().getBody();
and append the data to a string buffer to create the output (updating the
linecount)
4- update the file line number in the local file
5- start a route to upload the line number file.

Translated in camel routes:

from("direct:" + ROUTE_INCREMENTAL)
				.id("direct_"+ROUTE_INCREMENTAL)
				.unmarshal(csv)
				.process(new Processor() {
					@Override
					public void process(Exchange exchange) throws Exception {

						getContext().startRoute(ROUTE_GET_CUTTENT_POSITION);
						File file = new File(evaluatePathWithProperties(
								localFilePath, localFileName));

						
						
						if (file.exists()) {
							DataInputStream in = new DataInputStream(
									new FileInputStream(file));
							BufferedReader br = new BufferedReader(
									new InputStreamReader(in));
							String firstLine = br.readLine();
							int currentValue = 1;
							if (firstLine != null) {
								try {
									currentValue = Integer.parseInt(firstLine);
								} catch (Exception e) {
									System.out
											.println("Error reading position file check if firstline is just
a number");
								}

							}
							in.close();

							List<List&lt;String>> data = (List<List&lt;String>>) exchange
									.getIn().getBody();
							StringBuffer s = new StringBuffer();
							s.append("<Message><ImageMessage><assetIds>");
							int counter = 0;
							for (int i = currentValue; i < data.size(); i++) {
								currentValue++;
								counter++;
								s.append("\n\t<assetId>" + data.get(i).get(0)
										+ "</assetId>");

								if (counter == 20) {
									s.append("\n</assetIds></ImageMessage>");
									s.append("\n<ImageMessage><assetIds>");
									counter = 0;
								}
							}
							s.append("\n</assetIds></ImageMessage></Message>");
							exchange.getIn().setBody(s.toString());

							FileOutputStream fos = new FileOutputStream(file);
							DataOutputStream dos = new DataOutputStream(fos);
							Integer integer = new Integer(currentValue);
							dos.write(integer.toString().getBytes());
							dos.close();

							getContext().startRoute(ROUTE_SET_CURRENT_POSITION);
						}
					}
				})
				// .to("file:/opt/servicemix/data/smx-data/outbox/adobes7");
				.split(xpath(splitXml))
				.to("jms:" + jmsincrementalQueue + "?jmsMessageType=Text");

		from(
				ftpDataType + "://" + ftpDataUsername + "@" + ftpDataUrl
						+ ftpDataPath + "?password=" + ftpDataPassword
						+ "&fileName=" + ftpDataFilename
						+ "&noop=true&idempotent=false").noAutoStartup()
				.id(ROUTE_GET_CUTTENT_POSITION)
				.to("file:" + localFilePath + "?fileName=" + localFileName)
				.process(new Processor() {

					@Override
					public void process(Exchange arg0) throws Exception {
						getContext().getShutdownStrategy().setTimeout(ROUTE_STOP_TIMEOUT);
						getContext().stopRoute(ROUTE_GET_CUTTENT_POSITION);
					}
				});

		from("file:" + localFilePath + "?fileName=" + localFileName)
				.noAutoStartup()
				.id(ROUTE_SET_CURRENT_POSITION)
				.to(ftpDataType + "://" + ftpDataUsername + "@" + ftpDataUrl
						+ ftpDataPath + "?password=" + ftpDataPassword
						+ "&fileName=" + ftpDataFilename
						+ "&noop=true&idempotent=false")
				.process(new Processor() {
					@Override
					public void process(Exchange arg0) throws Exception {
						getContext().getShutdownStrategy().setTimeout(ROUTE_STOP_TIMEOUT);
						getContext().stopRoute(ROUTE_SET_CURRENT_POSITION);
					}
				});

but in this case i get an exception:
org.apache.camel.CamelExecutionException: Exception occurred during
execution on the exchange: Exchange[Message: [[Id, Path, Resolution,
Modifier, PostModifier, MaskPath, Expiration, ThumbType, ThumbRes, UserData,
Anchor, ImageSet, Map, Targets, LastModified, TimeStamp, Size, DigimarcInfo,
AssetType], [product-6-model-big,
_optimized_/9d2/9d217286-ded8-4148-b13e-a2b0907c4e75.tif, 72.0, , , , , , ,
, 304,783, , , , Wed Aug 29 12:34:53 IST 2012, 1346240093911, 609,1566, ,
IMAGE], [247593-011_psfront1,
_optimized_/059/059b0e91-7074-46c1-a7fb-2b5f99f7040d.tif, 72.0, , , , , , ,
, 69,117, , , , Thu Aug 30 12:05:09 IST 2012, 1346324709095, 138,235, ,
IMAGE], [247593-011_psback2,
_optimized_/8f3/8f3f7b61-c970-4864-80e9-a7547e1a7abc.tif, , , , , , , , ,
70,86, , , , Thu Aug 30 12:05:24 IST 2012, 1346324724979, 140,172, , IMAGE],
[247593-011_psdet, _optimized_/70f/70fc518f-87fb-4167-8d2e-fb71ca7f92ae.tif,
72.0, , , , , , , , 57,71, , , , Thu Aug 30 12:05:27 IST 2012,
1346324727056, 115,142, , IMAGE], [247593-011_mdfront3,
_optimized_/668/668994ff-0def-4ad6-b361-2658d486376b.tif, 72.0, , , , , , ,
, 57,69,... [Body clipped after 1000 chars, total length is 15819508]]
Caused by: java.lang.OutOfMemoryError: Java heap space

I set the memory to 1024MB, but I still get this error...
Any Idea on how to solve it? and is that the correct approach to do what I
want?





--
View this message in context: http://camel.465427.n5.nabble.com/Best-Strategy-to-process-a-csv-from-a-given-line-tp5720112.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Best Strategy to process a csv from a given line

Posted by Christian Müller <ch...@gmail.com>.
Could you please provide the full stack trace.
Which parameter did you pass to the JVM? "I set the memory to 1024MB" is
very unspecific...
Which Camel version and OS do you use?

And you should rework your processor. It will not close the steams in every
case an exception happens. An IMO, it should not be an anonymous inner
class if it's so complicated as you processor...

Best,
Christian

On Thu, Sep 27, 2012 at 12:43 PM, ddelizia <da...@unic.com> wrote:

> Hello,
> I have to consume a csv (big file 14MB) and start to read it from a given
> line number. The line number is stored in a file stored in a ftp, at the
> end
> of the process I have to update the line number with the last line I read.
> So my approach was unmarshalling the file with .unmarshal(csv) and create a
> processor that:
>
> 1- Starts a route to download the file locally
> 2- Read the file and get the linenumber
> 3- loop on the object (List<List&lt;String>>) exchange.getIn().getBody();
> and append the data to a string buffer to create the output (updating the
> linecount)
> 4- update the file line number in the local file
> 5- start a route to upload the line number file.
>
> Translated in camel routes:
>
> from("direct:" + ROUTE_INCREMENTAL)
>                                 .id("direct_"+ROUTE_INCREMENTAL)
>                                 .unmarshal(csv)
>                                 .process(new Processor() {
>                                         @Override
>                                         public void process(Exchange
> exchange) throws Exception {
>
>
> getContext().startRoute(ROUTE_GET_CUTTENT_POSITION);
>                                                 File file = new
> File(evaluatePathWithProperties(
>
> localFilePath, localFileName));
>
>
>
>                                                 if (file.exists()) {
>                                                         DataInputStream in
> = new DataInputStream(
>
> new FileInputStream(file));
>                                                         BufferedReader br
> = new BufferedReader(
>
> new InputStreamReader(in));
>                                                         String firstLine =
> br.readLine();
>                                                         int currentValue =
> 1;
>                                                         if (firstLine !=
> null) {
>                                                                 try {
>
> currentValue = Integer.parseInt(firstLine);
>                                                                 } catch
> (Exception e) {
>
> System.out
>
>               .println("Error reading position file check if firstline is
> just
> a number");
>                                                                 }
>
>                                                         }
>                                                         in.close();
>
>
> List<List&lt;String>> data = (List<List&lt;String>>) exchange
>
> .getIn().getBody();
>                                                         StringBuffer s =
> new StringBuffer();
>
> s.append("<Message><ImageMessage><assetIds>");
>                                                         int counter = 0;
>                                                         for (int i =
> currentValue; i < data.size(); i++) {
>
> currentValue++;
>                                                                 counter++;
>
> s.append("\n\t<assetId>" + data.get(i).get(0)
>
>       + "</assetId>");
>
>                                                                 if
> (counter == 20) {
>
> s.append("\n</assetIds></ImageMessage>");
>
> s.append("\n<ImageMessage><assetIds>");
>
> counter = 0;
>                                                                 }
>                                                         }
>
> s.append("\n</assetIds></ImageMessage></Message>");
>
> exchange.getIn().setBody(s.toString());
>
>                                                         FileOutputStream
> fos = new FileOutputStream(file);
>                                                         DataOutputStream
> dos = new DataOutputStream(fos);
>                                                         Integer integer =
> new Integer(currentValue);
>
> dos.write(integer.toString().getBytes());
>                                                         dos.close();
>
>
> getContext().startRoute(ROUTE_SET_CURRENT_POSITION);
>                                                 }
>                                         }
>                                 })
>                                 //
> .to("file:/opt/servicemix/data/smx-data/outbox/adobes7");
>                                 .split(xpath(splitXml))
>                                 .to("jms:" + jmsincrementalQueue +
> "?jmsMessageType=Text");
>
>                 from(
>                                 ftpDataType + "://" + ftpDataUsername +
> "@" + ftpDataUrl
>                                                 + ftpDataPath +
> "?password=" + ftpDataPassword
>                                                 + "&fileName=" +
> ftpDataFilename
>                                                 +
> "&noop=true&idempotent=false").noAutoStartup()
>                                 .id(ROUTE_GET_CUTTENT_POSITION)
>                                 .to("file:" + localFilePath + "?fileName="
> + localFileName)
>                                 .process(new Processor() {
>
>                                         @Override
>                                         public void process(Exchange arg0)
> throws Exception {
>
> getContext().getShutdownStrategy().setTimeout(ROUTE_STOP_TIMEOUT);
>
> getContext().stopRoute(ROUTE_GET_CUTTENT_POSITION);
>                                         }
>                                 });
>
>                 from("file:" + localFilePath + "?fileName=" +
> localFileName)
>                                 .noAutoStartup()
>                                 .id(ROUTE_SET_CURRENT_POSITION)
>                                 .to(ftpDataType + "://" + ftpDataUsername
> + "@" + ftpDataUrl
>                                                 + ftpDataPath +
> "?password=" + ftpDataPassword
>                                                 + "&fileName=" +
> ftpDataFilename
>                                                 +
> "&noop=true&idempotent=false")
>                                 .process(new Processor() {
>                                         @Override
>                                         public void process(Exchange arg0)
> throws Exception {
>
> getContext().getShutdownStrategy().setTimeout(ROUTE_STOP_TIMEOUT);
>
> getContext().stopRoute(ROUTE_SET_CURRENT_POSITION);
>                                         }
>                                 });
>
> but in this case i get an exception:
> org.apache.camel.CamelExecutionException: Exception occurred during
> execution on the exchange: Exchange[Message: [[Id, Path, Resolution,
> Modifier, PostModifier, MaskPath, Expiration, ThumbType, ThumbRes,
> UserData,
> Anchor, ImageSet, Map, Targets, LastModified, TimeStamp, Size,
> DigimarcInfo,
> AssetType], [product-6-model-big,
> _optimized_/9d2/9d217286-ded8-4148-b13e-a2b0907c4e75.tif, 72.0, , , , , , ,
> , 304,783, , , , Wed Aug 29 12:34:53 IST 2012, 1346240093911, 609,1566, ,
> IMAGE], [247593-011_psfront1,
> _optimized_/059/059b0e91-7074-46c1-a7fb-2b5f99f7040d.tif, 72.0, , , , , , ,
> , 69,117, , , , Thu Aug 30 12:05:09 IST 2012, 1346324709095, 138,235, ,
> IMAGE], [247593-011_psback2,
> _optimized_/8f3/8f3f7b61-c970-4864-80e9-a7547e1a7abc.tif, , , , , , , , ,
> 70,86, , , , Thu Aug 30 12:05:24 IST 2012, 1346324724979, 140,172, ,
> IMAGE],
> [247593-011_psdet,
> _optimized_/70f/70fc518f-87fb-4167-8d2e-fb71ca7f92ae.tif,
> 72.0, , , , , , , , 57,71, , , , Thu Aug 30 12:05:27 IST 2012,
> 1346324727056, 115,142, , IMAGE], [247593-011_mdfront3,
> _optimized_/668/668994ff-0def-4ad6-b361-2658d486376b.tif, 72.0, , , , , , ,
> , 57,69,... [Body clipped after 1000 chars, total length is 15819508]]
> Caused by: java.lang.OutOfMemoryError: Java heap space
>
> I set the memory to 1024MB, but I still get this error...
> Any Idea on how to solve it? and is that the correct approach to do what I
> want?
>
>
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Best-Strategy-to-process-a-csv-from-a-given-line-tp5720112.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



--