You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@crunch.apache.org by "Anuj Ojha (JIRA)" <ji...@apache.org> on 2013/07/02 23:43:20 UTC

[jira] [Updated] (CRUNCH-232) DoFn initialize method gets called twice where as cleanup gets called only once when join is performed on two PTables.

     [ https://issues.apache.org/jira/browse/CRUNCH-232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Anuj Ojha updated CRUNCH-232:
-----------------------------

    Description: 
DoFn's initialize method gets called twice where as cleanup gets called only once, when a Join is performed on two Ptables.
 
Sample Test:
{code borderStyle=solid} 
        final Configuration config = HBaseTest.getConf();
        final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config);
        final PCollection<String> collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource(
                "/HbaseTestFile.txt").toString());
 
        final PCollection<String> collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource(
                "/HbaseTestFile2.txt").toString());
 
        final PTable<Integer, String> ptable1 = collectionHelper2.parallelDo("Creating table", new DoFnCheck(),
                Avros.tableOf(Avros.ints(), Avros.strings()));
 
        final PTable<Integer, String> ptable2 = collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(),
                Avros.tableOf(Avros.ints(), Avros.strings()));
 
        final PTable<Integer, Pair<String, String>> joinedTable = ptable1.join(ptable2);
 
        final PCollection<String> joinedStrings = joinedTable.parallelDo(
                new MapFn<Pair<Integer, Pair<String, String>>, String>() {
                    private static final long serialVersionUID = -8796426750247480646L;
 
                    @Override
                    public String map(final Pair<Integer, Pair<String, String>> input) {
                        return input.second().first() + "/" + input.second().second();
                    }
                }, Avros.strings());
 
        System.out.println(joinedStrings.materialize().iterator().hasNext());
 {code}
 
The two DoFnCheck looks something like this:
 
{code} 
public class DoFnCheck extends DoFn<String, Pair<Integer, String>> {
    /**
     * 
     */
    private static final long serialVersionUID = 6780749658216132026L;
 
    @Override
    public void initialize() {
        System.out
                .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
    }
 
    @Override
    public void cleanup(final Emitter<Pair<Integer, String>> emitter) {
        System.out
                .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
    }
 
    @Override
    public void process(final String input, final Emitter<Pair<Integer, String>> emitter) {
        // TODO Auto-generated method stub
        System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
 
        final Pair<Integer, String> pair = new Pair<Integer, String>(1, input);
 
        emitter.emit(pair);
    }
}
{code} 
 
The console looks like this:
 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

  was:
DoFn's initialize method gets called twice where as cleanup gets called only once, when a Join is performed on two Ptables.
 
Sample Test:
{code} 
        final Configuration config = HBaseTest.getConf();
        final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config);
        final PCollection<String> collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource(
                "/HbaseTestFile.txt").toString());
 
        final PCollection<String> collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource(
                "/HbaseTestFile2.txt").toString());
 
        final PTable<Integer, String> ptable1 = collectionHelper2.parallelDo("Creating table", new DoFnCheck(),
                Avros.tableOf(Avros.ints(), Avros.strings()));
 
        final PTable<Integer, String> ptable2 = collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(),
                Avros.tableOf(Avros.ints(), Avros.strings()));
 
        final PTable<Integer, Pair<String, String>> joinedTable = ptable1.join(ptable2);
 
        final PCollection<String> joinedStrings = joinedTable.parallelDo(
                new MapFn<Pair<Integer, Pair<String, String>>, String>() {
                    private static final long serialVersionUID = -8796426750247480646L;
 
                    @Override
                    public String map(final Pair<Integer, Pair<String, String>> input) {
                        return input.second().first() + "/" + input.second().second();
                    }
                }, Avros.strings());
 
        System.out.println(joinedStrings.materialize().iterator().hasNext());
 {code}
 
The two DoFnCheck looks something like this:
 
{code} 
public class DoFnCheck extends DoFn<String, Pair<Integer, String>> {
    /**
     * 
     */
    private static final long serialVersionUID = 6780749658216132026L;
 
    @Override
    public void initialize() {
        System.out
                .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
    }
 
    @Override
    public void cleanup(final Emitter<Pair<Integer, String>> emitter) {
        System.out
                .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
    }
 
    @Override
    public void process(final String input, final Emitter<Pair<Integer, String>> emitter) {
        // TODO Auto-generated method stub
        System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
 
        final Pair<Integer, String> pair = new Pair<Integer, String>(1, input);
 
        emitter.emit(pair);
    }
}
{code} 
 
The console looks like this:
 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

    
> DoFn initialize method gets called twice where as cleanup gets called only once when join is performed on two PTables.
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-232
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-232
>             Project: Crunch
>          Issue Type: Bug
>          Components: MapReduce Patterns
>    Affects Versions: 0.6.0
>            Reporter: Anuj Ojha
>            Priority: Critical
>
> DoFn's initialize method gets called twice where as cleanup gets called only once, when a Join is performed on two Ptables.
>  
> Sample Test:
> {code borderStyle=solid} 
>         final Configuration config = HBaseTest.getConf();
>         final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config);
>         final PCollection<String> collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource(
>                 "/HbaseTestFile.txt").toString());
>  
>         final PCollection<String> collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource(
>                 "/HbaseTestFile2.txt").toString());
>  
>         final PTable<Integer, String> ptable1 = collectionHelper2.parallelDo("Creating table", new DoFnCheck(),
>                 Avros.tableOf(Avros.ints(), Avros.strings()));
>  
>         final PTable<Integer, String> ptable2 = collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(),
>                 Avros.tableOf(Avros.ints(), Avros.strings()));
>  
>         final PTable<Integer, Pair<String, String>> joinedTable = ptable1.join(ptable2);
>  
>         final PCollection<String> joinedStrings = joinedTable.parallelDo(
>                 new MapFn<Pair<Integer, Pair<String, String>>, String>() {
>                     private static final long serialVersionUID = -8796426750247480646L;
>  
>                     @Override
>                     public String map(final Pair<Integer, Pair<String, String>> input) {
>                         return input.second().first() + "/" + input.second().second();
>                     }
>                 }, Avros.strings());
>  
>         System.out.println(joinedStrings.materialize().iterator().hasNext());
>  {code}
>  
> The two DoFnCheck looks something like this:
>  
> {code} 
> public class DoFnCheck extends DoFn<String, Pair<Integer, String>> {
>     /**
>      * 
>      */
>     private static final long serialVersionUID = 6780749658216132026L;
>  
>     @Override
>     public void initialize() {
>         System.out
>                 .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>     }
>  
>     @Override
>     public void cleanup(final Emitter<Pair<Integer, String>> emitter) {
>         System.out
>                 .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>     }
>  
>     @Override
>     public void process(final String input, final Emitter<Pair<Integer, String>> emitter) {
>         // TODO Auto-generated method stub
>         System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>  
>         final Pair<Integer, String> pair = new Pair<Integer, String>(1, input);
>  
>         emitter.emit(pair);
>     }
> }
> {code} 
>  
> The console looks like this:
>  
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira