You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hadoop.apache.org by Arshad Ali Sayed <ar...@gmail.com> on 2015/08/06 16:14:53 UTC

Reading data from FTP Server in Hadoop/Cascading

I want to read data from FTP Server.I am providing path of the file which
resides on FTP server in the format *ftp://Username:Password@host/path. *
When I use map reduce program to read data from file  it works fine. I want
to read data from same file through Cascading framework. I am using *Hfs
tap *of cascading framework to read data*. *It throws following exception

*:java.io.IOException: Stream closed*
    at org.apache.hadoop.fs.ftp.FTPInputStream.close(FTPInputStream.java:98)
    at java.io.FilterInputStream.close(Unknown Source)
    at org.apache.hadoop.util.LineReader.close(LineReader.java:83)
    at
org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:168)
    at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.close(MapTask.java:254)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:440)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
    at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)

Below is the code of cascading framework from where I am reading the files:
public class FTPWithHadoopDemo {
    public static void main(String args[]) {
        Tap source = new Hfs(new TextLine(new Fields("line")),
"ftp://user:pwd@xx.xx.xx.xx//input1");
        Tap sink = new Hfs(new TextLine(new Fields("line1")), "OP\\op",
SinkMode.REPLACE);
        Pipe pipe = new Pipe("First");
        pipe = new Each(pipe, new RegexSplitGenerator("\\s+"));
        pipe = new GroupBy(pipe);
        Pipe tailpipe = new Every(pipe, new Count());
        FlowDef flowDef = FlowDef.flowDef().addSource(pipe,
source).addTailSink(tailpipe, sink);
        new HadoopFlowConnector().connect(flowDef).complete();
    }
}

I tried to look in Hadoop Source code for the same exception. I found that
in the MapTask class there is one method runOldMapper which deals with
stream. And in the same method there is finally block where stream gets
closed. When I remove that line from code it works fine. Below is the code:
private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runOldMapper(final JobConf
job, final TaskSplitIndex splitIndex,
            final TaskUmbilicalProtocol umbilical, TaskReporter reporter)
                    throws IOException, InterruptedException,
ClassNotFoundException {
        InputSplit inputSplit = getSplitDetails(new
Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());

        updateJobWithSplit(job, inputSplit);
        reporter.setInputSplit(inputSplit);

        RecordReader<INKEY, INVALUE> in = isSkipping()
                ? new SkippingRecordReader<INKEY, INVALUE>(inputSplit,
umbilical, reporter)
                : new TrackedRecordReader<INKEY, INVALUE>(inputSplit, job,
reporter);
        job.setBoolean("mapred.skip.on", isSkipping());

        int numReduceTasks = conf.getNumReduceTasks();
        LOG.info("numReduceTasks: " + numReduceTasks);
        MapOutputCollector collector = null;
        if (numReduceTasks > 0) {
            collector = new MapOutputBuffer(umbilical, job, reporter);
        } else {
            collector = new DirectMapOutputCollector(umbilical, job,
reporter);
        }
        MapRunnable<INKEY, INVALUE, OUTKEY, OUTVALUE> runner =
ReflectionUtils.newInstance(job.getMapRunnerClass(),
                job);

        try {
            runner.run(in, new OldOutputCollector(collector, conf),
reporter);
            collector.flush();
        } finally {
            // close
            in.close(); // close input
            collector.close();
        }
    }

I have asked the same question on cascading-user group and they replied "*HFS
supports whatever Hadoop supports, so if you supply a URI in the format
ftp://, it should do the right thing.*" But still I am getting this
exceptions.
please assist me in solving this problem.


Thanks,
Arshadali