You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@crunch.apache.org by "Anuj Ojha (JIRA)" <ji...@apache.org> on 2013/07/02 23:43:20 UTC
[jira] [Updated] (CRUNCH-232) DoFn initialize method gets called
twice where as cleanup gets called only once when join is performed on two
PTables.
[ https://issues.apache.org/jira/browse/CRUNCH-232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Anuj Ojha updated CRUNCH-232:
-----------------------------
Description:
DoFn's initialize method gets called twice where as cleanup gets called only once, when a Join is performed on two Ptables.
Sample Test:
{code borderStyle=solid}
final Configuration config = HBaseTest.getConf();
final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config);
final PCollection<String> collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource(
"/HbaseTestFile.txt").toString());
final PCollection<String> collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource(
"/HbaseTestFile2.txt").toString());
final PTable<Integer, String> ptable1 = collectionHelper2.parallelDo("Creating table", new DoFnCheck(),
Avros.tableOf(Avros.ints(), Avros.strings()));
final PTable<Integer, String> ptable2 = collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(),
Avros.tableOf(Avros.ints(), Avros.strings()));
final PTable<Integer, Pair<String, String>> joinedTable = ptable1.join(ptable2);
final PCollection<String> joinedStrings = joinedTable.parallelDo(
new MapFn<Pair<Integer, Pair<String, String>>, String>() {
private static final long serialVersionUID = -8796426750247480646L;
@Override
public String map(final Pair<Integer, Pair<String, String>> input) {
return input.second().first() + "/" + input.second().second();
}
}, Avros.strings());
System.out.println(joinedStrings.materialize().iterator().hasNext());
{code}
The two DoFnCheck looks something like this:
{code}
public class DoFnCheck extends DoFn<String, Pair<Integer, String>> {
/**
*
*/
private static final long serialVersionUID = 6780749658216132026L;
@Override
public void initialize() {
System.out
.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
}
@Override
public void cleanup(final Emitter<Pair<Integer, String>> emitter) {
System.out
.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
}
@Override
public void process(final String input, final Emitter<Pair<Integer, String>> emitter) {
// TODO Auto-generated method stub
System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
final Pair<Integer, String> pair = new Pair<Integer, String>(1, input);
emitter.emit(pair);
}
}
{code}
The console looks like this:
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
was:
DoFn's initialize method gets called twice where as cleanup gets called only once, when a Join is performed on two Ptables.
Sample Test:
{code}
final Configuration config = HBaseTest.getConf();
final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config);
final PCollection<String> collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource(
"/HbaseTestFile.txt").toString());
final PCollection<String> collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource(
"/HbaseTestFile2.txt").toString());
final PTable<Integer, String> ptable1 = collectionHelper2.parallelDo("Creating table", new DoFnCheck(),
Avros.tableOf(Avros.ints(), Avros.strings()));
final PTable<Integer, String> ptable2 = collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(),
Avros.tableOf(Avros.ints(), Avros.strings()));
final PTable<Integer, Pair<String, String>> joinedTable = ptable1.join(ptable2);
final PCollection<String> joinedStrings = joinedTable.parallelDo(
new MapFn<Pair<Integer, Pair<String, String>>, String>() {
private static final long serialVersionUID = -8796426750247480646L;
@Override
public String map(final Pair<Integer, Pair<String, String>> input) {
return input.second().first() + "/" + input.second().second();
}
}, Avros.strings());
System.out.println(joinedStrings.materialize().iterator().hasNext());
{code}
The two DoFnCheck looks something like this:
{code}
public class DoFnCheck extends DoFn<String, Pair<Integer, String>> {
/**
*
*/
private static final long serialVersionUID = 6780749658216132026L;
@Override
public void initialize() {
System.out
.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
}
@Override
public void cleanup(final Emitter<Pair<Integer, String>> emitter) {
System.out
.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
}
@Override
public void process(final String input, final Emitter<Pair<Integer, String>> emitter) {
// TODO Auto-generated method stub
System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
final Pair<Integer, String> pair = new Pair<Integer, String>(1, input);
emitter.emit(pair);
}
}
{code}
The console looks like this:
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> DoFn initialize method gets called twice where as cleanup gets called only once when join is performed on two PTables.
> ----------------------------------------------------------------------------------------------------------------------
>
> Key: CRUNCH-232
> URL: https://issues.apache.org/jira/browse/CRUNCH-232
> Project: Crunch
> Issue Type: Bug
> Components: MapReduce Patterns
> Affects Versions: 0.6.0
> Reporter: Anuj Ojha
> Priority: Critical
>
> DoFn's initialize method gets called twice where as cleanup gets called only once, when a Join is performed on two Ptables.
>
> Sample Test:
> {code borderStyle=solid}
> final Configuration config = HBaseTest.getConf();
> final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config);
> final PCollection<String> collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource(
> "/HbaseTestFile.txt").toString());
>
> final PCollection<String> collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource(
> "/HbaseTestFile2.txt").toString());
>
> final PTable<Integer, String> ptable1 = collectionHelper2.parallelDo("Creating table", new DoFnCheck(),
> Avros.tableOf(Avros.ints(), Avros.strings()));
>
> final PTable<Integer, String> ptable2 = collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(),
> Avros.tableOf(Avros.ints(), Avros.strings()));
>
> final PTable<Integer, Pair<String, String>> joinedTable = ptable1.join(ptable2);
>
> final PCollection<String> joinedStrings = joinedTable.parallelDo(
> new MapFn<Pair<Integer, Pair<String, String>>, String>() {
> private static final long serialVersionUID = -8796426750247480646L;
>
> @Override
> public String map(final Pair<Integer, Pair<String, String>> input) {
> return input.second().first() + "/" + input.second().second();
> }
> }, Avros.strings());
>
> System.out.println(joinedStrings.materialize().iterator().hasNext());
> {code}
>
> The two DoFnCheck looks something like this:
>
> {code}
> public class DoFnCheck extends DoFn<String, Pair<Integer, String>> {
> /**
> *
> */
> private static final long serialVersionUID = 6780749658216132026L;
>
> @Override
> public void initialize() {
> System.out
> .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
> }
>
> @Override
> public void cleanup(final Emitter<Pair<Integer, String>> emitter) {
> System.out
> .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
> }
>
> @Override
> public void process(final String input, final Emitter<Pair<Integer, String>> emitter) {
> // TODO Auto-generated method stub
> System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>
> final Pair<Integer, String> pair = new Pair<Integer, String>(1, input);
>
> emitter.emit(pair);
> }
> }
> {code}
>
> The console looks like this:
>
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira