You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Grzegorz Grzybek (JIRA)" <ji...@apache.org> on 2015/03/05 10:29:38 UTC

[jira] [Commented] (CAMEL-7318) Concurrency on HDFS Consumer not working efficiently

    [ https://issues.apache.org/jira/browse/CAMEL-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14348469#comment-14348469 ] 

Grzegorz Grzybek commented on CAMEL-7318:
-----------------------------------------

Actually the file is not being read twice.
I've setup two consumers and watched the behavior under debugger.
This code you've mentioned (in org.apache.camel.component.hdfs2.HdfsInputStream#createInputStream()):
{code:java}
        info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
{code}
doesn't return {{false}} if can't rename, it throws exception with the following stack trace ("file://" case):
{noformat}
java.io.FileNotFoundException: File file:/data/ggrzybek/sources/github.com/grgrzybek/camel/components/camel-hdfs2/target/test/multiple-consumers/file-0194.txt does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:511)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:722)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:337)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:289)
	at org.apache.hadoop.fs.RawLocalFileSystem.rename(RawLocalFileSystem.java:334)
	at org.apache.hadoop.fs.ChecksumFileSystem.rename(ChecksumFileSystem.java:503)
	at org.apache.camel.component.hdfs2.HdfsInputStream.createInputStream(HdfsInputStream.java:49)
	at org.apache.camel.component.hdfs2.HdfsConsumer.doPoll(HdfsConsumer.java:140)
	at org.apache.camel.component.hdfs2.HdfsConsumer.poll(HdfsConsumer.java:98)
	at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:174)
	at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:101)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
{noformat}
so then Camel invokes org.apache.camel.spi.PollingConsumerPollStrategy#rollback() and simply skips this file and move to the next one.
I'll check how it works in "hdfs://" case.

> Concurrency on HDFS Consumer not working efficiently
> ----------------------------------------------------
>
>                 Key: CAMEL-7318
>                 URL: https://issues.apache.org/jira/browse/CAMEL-7318
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-hdfs
>    Affects Versions: 2.11.2
>            Reporter: Martha Obrinteschi
>            Assignee: Grzegorz Grzybek
>            Priority: Minor
>              Labels: concurrency, parallel
>
> If we have two HDFS consumers the files are being processed twice (once by each consumer, waiting one after the other) so the consumers are not working in parallel. If we add this the consumers will work as a team and the transfer will go faster.
> This happens because there is no exception thrown (the rename method just returns true or false and everything carries on as nothing would have happened :).
> In order to fix this we could add in the HdfsInputStream: 49
> boolean tf = info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
> if (!tf) {
> 	throw new IOException("This exception is thrown because the rename did not succeded.");
> 	}
> And also in the HdfsConsumer: 150
> try {
>      this.rwlock.writeLock().lock();
>      this.istream = HdfsInputStream.createInputStream(fileStatuses[i].getPath().toString(), this.config);
> } catch (IOException ioe) {
>      log.info(ioe.getMessage()	+ " If the rename fails we move on to the next file.");
>      continue;
> }
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)