You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Lucy Chen <lu...@gmail.com> on 2015/03/25 00:56:27 UTC

question about Crunch pipeline

Hi, I just recently started to use Crunch, and got issues really confused
me. With the following codes running, result.succeeded() is 1;

 however, if I added one line  "labelData.write(At.textFile("/labelData"),
WriteMode.OVERWRITE);" before running the pipeline, result.succeeded() is
0.

I wonder why this happened and what's the potential issue caused it failed.
Did I have something wrong?


Thanks for your help.


Lu


public class LabelDataCollector implements Serializable{

  //covert the raw text inputs to the FeatObject object

public PCollection<FeatObject> getFeatSamples(Pipeline pipeline, String
labelDataPath)

{

PCollection<String> rawInputs = pipeline.readTextFile(labelDataPath);

 PCollection<String> validLines = rawInputs.filter(new
FeatValueNullFilter());


 PType<FeatObject> featObjectType = Avros.reflects(FeatObject.class);

PCollection<FeatObject> featSamples = validLines.parallelDo(new
GenerateFeatSample(), featObjectType);

return featSamples;

 }


public static void main(String args[]) throws IOException,
InterruptedException {

Configuration conf = new Configuration();

conf.set("fs.default.name", "file:///");

conf.set("mapred.job.tracker", "local");

Pipeline pipeline = new MRPipeline(LabelDataCollector.class,
"LabelDataCollector", conf);

LabelDataCollector ldc = new LabelDataCollector();

PCollection<FeatObject> labelData = ldc.getFeatSamples(pipeline,
"/feat_inputs");


// Execute the pipeline as a MapReduce.

                PipelineResult result = pipeline.done();

                System.out.println(result.succeeded() ? 0 : 1);

}


}


public class GenerateFeatSample extends DoFn<String, FeatObject>{


private final static Logger logger = Logger

      .getLogger(FeatObject.class.getName());

private static final long serialVersionUID = 1L;


public void process(String input, Emitter<FeatObject> emitter){

 if (input == null || input.isEmpty()) {

logger.error("Input is null or empty");

      return;

    }

        logger.info("convert the input string to a feature object!");

emitter.emit(new FeatObject(input));

 }


}


public class FeatObject implements java.io.Serializable, Cloneable{

 private static final long serialVersionUID = 1L;

private String labelID;

private String sampleID;

private int pos_neg_ind;

private Map<String, Double> feat_val_pair;

private int num_of_feat;

  public FeatObject(String input)

       {

         ...

       }

}

Re: question about Crunch pipeline

Posted by Josh Wills <jo...@gmail.com>.
Hey Lucy,

Without the write(To.textFile("/labeledData"), WriteMode.OVERWRITE) line,
the Crunch pipeline isn't actually doing anything; the succeeded() function
returns true in this case b/c Crunch "succeeded" at not running anything--
I know, it's kind of weird. A couple of guesses as to what's wrong:

1) The FeatObject class needs a no-arg constructor for Avro reflection to
work properly, or
2) The pipeline is failing when it tries to write add the labeledData
directory under the root directory b/c of a filesystem permissions problem.
Try writing out to /tmp/labeledData and see if that does the trick.

On my machine, log files from MR jobs run in local mode get written out
under /tmp/hadoop-$USER/mapred, so that's a good place to look. If you add
a keep.failed.task.files=true setting to the Configuration you're using for
your job, the log files for the failed tasks should stick around after the
job finishes so you can examine them to debug the problem.

Best,
Josh


On Tue, Mar 24, 2015 at 1:56 PM, Lucy Chen <lu...@gmail.com>
wrote:

> Hi, I just recently started to use Crunch, and got issues really confused
> me. With the following codes running, result.succeeded() is 1;
>
>  however, if I added one line  "labelData.write(At.textFile("/labelData"),
> WriteMode.OVERWRITE);" before running the pipeline, result.succeeded() is
> 0.
>
> I wonder why this happened and what's the potential issue caused it
> failed. Did I have something wrong?
>
>
> Thanks for your help.
>
>
> Lu
>
>
> public class LabelDataCollector implements Serializable{
>
>   //covert the raw text inputs to the FeatObject object
>
> public PCollection<FeatObject> getFeatSamples(Pipeline pipeline, String
> labelDataPath)
>
> {
>
> PCollection<String> rawInputs = pipeline.readTextFile(labelDataPath);
>
>  PCollection<String> validLines = rawInputs.filter(new
> FeatValueNullFilter());
>
>
>  PType<FeatObject> featObjectType = Avros.reflects(FeatObject.class);
>
> PCollection<FeatObject> featSamples = validLines.parallelDo(new
> GenerateFeatSample(), featObjectType);
>
> return featSamples;
>
>  }
>
>
> public static void main(String args[]) throws IOException,
> InterruptedException {
>
> Configuration conf = new Configuration();
>
> conf.set("fs.default.name", "file:///");
>
> conf.set("mapred.job.tracker", "local");
>
> Pipeline pipeline = new MRPipeline(LabelDataCollector.class,
> "LabelDataCollector", conf);
>
> LabelDataCollector ldc = new LabelDataCollector();
>
> PCollection<FeatObject> labelData = ldc.getFeatSamples(pipeline,
> "/feat_inputs");
>
>
> // Execute the pipeline as a MapReduce.
>
>                 PipelineResult result = pipeline.done();
>
>                 System.out.println(result.succeeded() ? 0 : 1);
>
> }
>
>
> }
>
>
> public class GenerateFeatSample extends DoFn<String, FeatObject>{
>
>
> private final static Logger logger = Logger
>
>       .getLogger(FeatObject.class.getName());
>
> private static final long serialVersionUID = 1L;
>
>
> public void process(String input, Emitter<FeatObject> emitter){
>
>  if (input == null || input.isEmpty()) {
>
> logger.error("Input is null or empty");
>
>       return;
>
>     }
>
>         logger.info("convert the input string to a feature object!");
>
> emitter.emit(new FeatObject(input));
>
>  }
>
>
> }
>
>
> public class FeatObject implements java.io.Serializable, Cloneable{
>
>  private static final long serialVersionUID = 1L;
>
> private String labelID;
>
> private String sampleID;
>
> private int pos_neg_ind;
>
> private Map<String, Double> feat_val_pair;
>
> private int num_of_feat;
>
>   public FeatObject(String input)
>
>        {
>
>          ...
>
>        }
>
> }
>
>