You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hadoop.apache.org by xeonmailinglist <xe...@gmail.com> on 2015/09/08 01:36:18 UTC

catch MapReduce calls with AOP?

Hi,

In [1] I show an wordcount example. I am trying to pointcut the 
invocations of |output.collect| and the method |cleanup| with AOP, but 
it is very difficult to do this.
I have tried to set |org.apache.hadoop.mapred.JobConf| and 
|org.apache.hadoop.mapreduce.Job;| in beans in order to intercept theses 
calls, but I can’t.

Is it possible to do this? Am I trying to do something that is impossible?

[1] My wordcount example

|public class MyWordCount { public static class MyMap extends Mapper { 
private final static IntWritable one = new IntWritable(1); private Text 
word = new Text(); private MedusaDigests parser = new MedusaDigests(); 
public void map(LongWritable key, Text value, OutputCollector<Text, 
IntWritable> output, Reporter reporter) throws IOException { 
StringTokenizer itr = new StringTokenizer(value.toString()); while 
(itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, 
one); parser.updateDigest(word.getBytes(), 
ByteBuffer.allocate(4).putInt(one.get()).array()); } } public void 
run(Context context) throws IOException, InterruptedException { 
setup(context); try { while (context.nextKeyValue()) { 
System.out.println("Key: " + context.getCurrentKey() + " Value: " + 
context.getCurrentValue()); map(context.getCurrentKey(), 
context.getCurrentValue(), context); } } finally { cleanup(context); } } 
protected void cleanup(Context context) throws IOException, 
InterruptedException { parser.cleanup(context); } } public static class 
MyReducer extends Reducer { private IntWritable result = new 
IntWritable(); MedusaDigests parser = new MedusaDigests(); public void 
reduce(Text key, Iterable<IntWritable> values, Context context ) throws 
IOException, InterruptedException { int sum = 0; for (IntWritable val : 
values) { System.out.println(" - key ( " + key.getClass().toString() + 
"): " + key.toString() + " value ( " + val.getClass().toString() + " ): 
" + val.toString()); sum += val.get(); } result.set(sum); 
context.write(key, result); } public void run(Context context) throws 
IOException, InterruptedException { setup(context); try { while 
(context.nextKey()) { System.out.println("Key: " + 
context.getCurrentKey()); reduce(context.getCurrentKey(), 
context.getValues(), context); // If a back up store is used, reset it 
Iterator<IntWritable> iter = context.getValues().iterator(); if(iter 
instanceof ReduceContext.ValueIterator) { 
((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore(); } } 
} finally { cleanup(context); } } protected void cleanup(Context 
context) throws IOException, InterruptedException { 
parser.cleanup(context); } } } |

​