You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hadoop.apache.org by xeonmailinglist <xe...@gmail.com> on 2015/09/08 01:36:18 UTC
catch MapReduce calls with AOP?
Hi,
In [1] I show an wordcount example. I am trying to pointcut the
invocations of |output.collect| and the method |cleanup| with AOP, but
it is very difficult to do this.
I have tried to set |org.apache.hadoop.mapred.JobConf| and
|org.apache.hadoop.mapreduce.Job;| in beans in order to intercept theses
calls, but I can’t.
Is it possible to do this? Am I trying to do something that is impossible?
[1] My wordcount example
|public class MyWordCount { public static class MyMap extends Mapper {
private final static IntWritable one = new IntWritable(1); private Text
word = new Text(); private MedusaDigests parser = new MedusaDigests();
public void map(LongWritable key, Text value, OutputCollector<Text,
IntWritable> output, Reporter reporter) throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString()); while
(itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word,
one); parser.updateDigest(word.getBytes(),
ByteBuffer.allocate(4).putInt(one.get()).array()); } } public void
run(Context context) throws IOException, InterruptedException {
setup(context); try { while (context.nextKeyValue()) {
System.out.println("Key: " + context.getCurrentKey() + " Value: " +
context.getCurrentValue()); map(context.getCurrentKey(),
context.getCurrentValue(), context); } } finally { cleanup(context); } }
protected void cleanup(Context context) throws IOException,
InterruptedException { parser.cleanup(context); } } public static class
MyReducer extends Reducer { private IntWritable result = new
IntWritable(); MedusaDigests parser = new MedusaDigests(); public void
reduce(Text key, Iterable<IntWritable> values, Context context ) throws
IOException, InterruptedException { int sum = 0; for (IntWritable val :
values) { System.out.println(" - key ( " + key.getClass().toString() +
"): " + key.toString() + " value ( " + val.getClass().toString() + " ):
" + val.toString()); sum += val.get(); } result.set(sum);
context.write(key, result); } public void run(Context context) throws
IOException, InterruptedException { setup(context); try { while
(context.nextKey()) { System.out.println("Key: " +
context.getCurrentKey()); reduce(context.getCurrentKey(),
context.getValues(), context); // If a back up store is used, reset it
Iterator<IntWritable> iter = context.getValues().iterator(); if(iter
instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore(); } }
} finally { cleanup(context); } } protected void cleanup(Context
context) throws IOException, InterruptedException {
parser.cleanup(context); } } } |