You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hbase.apache.org by Jothikumar Ekanath <kb...@gmail.com> on 2012/09/11 04:06:17 UTC

java.io.IOException: Pass a Delete or a Put

Hi,
       Getting this error while using hbase as a sink.


Error
java.io.IOException: Pass a Delete or a Put
        at
org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125)
        at
org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84)
        at
org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:587)
        at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
        at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
        at
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)



 Below is my code
Using the following version

Hbase = 0.94
Hadoop - 1.0.3

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.*;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class DailyAggMapReduce {

    public static void main(String args[]) throws Exception {
        Configuration config = HBaseConfiguration.create();
        Job job = new Job(config, "DailyAverageMR");
        job.setJarByClass(DailyAggMapReduce.class);
        Scan scan = new Scan();
        // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCaching(500);
        // don't set to true for MR jobs
        scan.setCacheBlocks(false);

        TableMapReduceUtil.initTableMapperJob(
                "HTASDB",        // input table
                scan,               // Scan instance to control CF and
attribute selection
                DailySumMapper.class,     // mapper class
                Text.class,         // mapper output key
                Text.class,  // mapper output value
                job);

        TableMapReduceUtil.initTableReducerJob(
                "DA",        // output table
                DailySumReducer.class,    // reducer class
                job);

        //job.setOutputValueClass(Put.class);
        job.setNumReduceTasks(1);   // at least one, adjust as required

        boolean b = job.waitForCompletion(true);
        if (!b) {
            throw new IOException("error with job!");
        }

    }


    public static class DailySumMapper extends TableMapper<Text, Text> {

        public void map(ImmutableBytesWritable row, Result value,
Mapper.Context context) throws IOException, InterruptedException {
            List<String> key = getRowKey(row.get());
            Text rowKey = new Text(key.get(0));
            int time = Integer.parseInt(key.get(1));
            //limiting the time for one day (Aug 04 2012) -- Testing, Not a
good way
            if (time <= 1344146400) {
                List<KeyValue> data = value.list();
                long inbound = 0l;
                long outbound = 0l;
                for (KeyValue kv : data) {
                    List<Long> values = getValues(kv.getValue());
                    if (values.get(0) != -1) {
                        inbound = inbound + values.get(0);
                    }
                    if (values.get(1) != -1) {
                        outbound = outbound + values.get(1);
                    }
                }
                context.write(rowKey, new Text(String.valueOf(inbound) +
"-" + String.valueOf(outbound)));
            }
        }

        private static List<Long> getValues(byte[] data) {
            List<Long> values = new ArrayList<Long>();
            ByteBuffer buffer = ByteBuffer.wrap(data);
            values.add(buffer.getLong());
            values.add(buffer.getLong());
            return values;
        }

        private static List<String> getRowKey(byte[] key) {
            List<String> keys = new ArrayList<String>();
            ByteBuffer buffer = ByteBuffer.wrap(key);
            StringBuilder sb = new StringBuilder();
            sb.append(buffer.getInt());
            sb.append("-");
            if (key.length == 13) {
                sb.append(buffer.getInt());
                sb.append("-");
            }
            sb.append(buffer.get());
            keys.add(sb.toString());
            keys.add(String.valueOf(buffer.getInt()));
            return keys;
        }
    }

    public static class DailySumReducer extends TableReducer<Text, Text,
Put> {
        private int count = 0;
        public void reduce(Text key, Iterable<Text> values, Reducer.Context
context) throws IOException, InterruptedException {
            long inbound = 0l;
            long outbound = 0l;
            for (Text val : values) {
                String text = val.toString();
                int index = text.indexOf("-");
                String in = text.substring(0,index);
                String out = text.substring(index+1,text.length());
                inbound = inbound + Long.parseLong(in);
                outbound = outbound + Long.parseLong(out);
            }
            ByteBuffer data = ByteBuffer.wrap(new byte[16]);
            data.putLong(inbound);
            data.putLong(outbound);
            Put put = new Put(Bytes.toBytes(key.toString()+20120804));
            put.add(Bytes.toBytes("t"), Bytes.toBytes("s"),data.array());
            context.setStatus("Emitting Put " + count++);
            context.write(key, put);
        }
    }
}

Thanks
Jothikumar

Re: java.io.IOException: Pass a Delete or a Put

Posted by Jothikumar Ekanath <kb...@gmail.com>.
Hi Doug,
                  That is where i took my code initially, not able to
notice anything different from there. I know there is something wrong with
the key in key out in my code, but not able to figure out.

I have given below what i am using, Do you see anything wrong in there?

DailySumMapper extends TableMapper<Text, Text>
>>     KEYOUT = Text
>>     VALUEOUT = Text
>>
>>  DailySumReducer extends TableReducer<Text, Text,
>>ImmutableBytesWritable>
>>
>>     KEYIN = Text
>>     VALUEIN = Text
>>     KEYOUT = ImmutableBytesWritable
>>     VALUEOUT = must be always Put or Delete when we extend TableReducer,
>> So we are not specifying that.
>>
>> Code
>>  public static class DailySumReducer extends TableReducer<Text, Text,
>> ImmutableBytesWritable> {
>>
>>         private int count = 0;
>>         protected void reduce(Text key, Iterable<Text>
>> values,Reducer.Context context) throws IOException,
>>InterruptedException{
>>
>>             long inbound = 0l;
>>             long outbound = 0l;
>>             for (Text val : values) {
>>                 String text = val.toString();
>>                 int index = text.indexOf("-");
>>                 String in = text.substring(0,index);
>>                 String out = text.substring(index+1,text.
length());
>>                 inbound = inbound + Long.parseLong(in);
>>                 outbound = outbound + Long.parseLong(out);
>>             }
>>             ByteBuffer data = ByteBuffer.wrap(new byte[16]);
>>             data.putLong(inbound);
>>             data.putLong(outbound);
>>             Put put = new Put(Bytes.toBytes(key.toString()+20120804));
>>             put.add(Bytes.toBytes("t"),
>>Bytes.toBytes("s"),data.array());
>>             context.setStatus("Emitting Put " + count++);
>>             ImmutableBytesWritable ibw = new
>> ImmutableBytesWritable(Bytes.toBytes(key.toString()));
>>             context.write(ibw,put);
>>
>>         }
>>     }



On Wed, Sep 12, 2012 at 11:05 AM, Doug Meil
<do...@explorysmedical.com>wrote:

>
> Did you compare your example to this...
>
> http://hbase.apache.org/book.html#mapreduce.example
> 7.2.2. HBase MapReduce Read/Write Example
>
>
> ?
>
>
>
>
> On 9/12/12 1:02 PM, "Jothikumar Ekanath" <kb...@gmail.com> wrote:
>
> >Any help on this one please.
> >
> >On Tue, Sep 11, 2012 at 11:19 AM, Jothikumar Ekanath
> ><kb...@gmail.com>wrote:
> >
> >> Hi Stack,
> >>                 Thanks for the reply. I looked at the code and i am
> >>having
> >> a very basic confusion on how to use it correctly.  The code i wrote
> >> earlier has the following input and output types and i want it that way
> >>
> >> After looking at the sources and examples, i modified my reducer (given
> >> below), the mapper and job configuration are still the same. Still i see
> >> the same error. Am i doing something wrong?
> >>
> >>
> >>  DailySumMapper extends TableMapper<Text, Text>
> >>     KEYOUT = Text
> >>     VALUEOUT = Text
> >>
> >>  DailySumReducer extends TableReducer<Text, Text,
> >>ImmutableBytesWritable>
> >>
> >>     KEYIN = Text
> >>     VALUEIN = Text
> >>     KEYOUT = ImmutableBytesWritable
> >>     VALUEOUT = must be always Put or Delete when we extend TableReducer,
> >> So we are not specifying that.
> >>
> >> Code
> >>  public static class DailySumReducer extends TableReducer<Text, Text,
> >> ImmutableBytesWritable> {
> >>
> >>         private int count = 0;
> >>         protected void reduce(Text key, Iterable<Text>
> >> values,Reducer.Context context) throws IOException,
> >>InterruptedException{
> >>
> >>             long inbound = 0l;
> >>             long outbound = 0l;
> >>             for (Text val : values) {
> >>                 String text = val.toString();
> >>                 int index = text.indexOf("-");
> >>                 String in = text.substring(0,index);
> >>                 String out = text.substring(index+1,text.length());
> >>                 inbound = inbound + Long.parseLong(in);
> >>                 outbound = outbound + Long.parseLong(out);
> >>             }
> >>             ByteBuffer data = ByteBuffer.wrap(new byte[16]);
> >>             data.putLong(inbound);
> >>             data.putLong(outbound);
> >>             Put put = new Put(Bytes.toBytes(key.toString()+20120804));
> >>             put.add(Bytes.toBytes("t"),
> >>Bytes.toBytes("s"),data.array());
> >>             context.setStatus("Emitting Put " + count++);
> >>             ImmutableBytesWritable ibw = new
> >> ImmutableBytesWritable(Bytes.toBytes(key.toString()));
> >>             context.write(ibw,put);
> >>
> >>         }
> >>     }
> >>
> >> On Tue, Sep 11, 2012 at 10:38 AM, Stack <st...@duboce.net> wrote:
> >>
> >>> On Mon, Sep 10, 2012 at 7:06 PM, Jothikumar Ekanath
> >>><kb...@gmail.com>
> >>> wrote:
> >>> > Hi,
> >>> >        Getting this error while using hbase as a sink.
> >>> >
> >>> >
> >>> > Error
> >>> > java.io.IOException: Pass a Delete or a Put
> >>>
> >>> Would suggest you study the mapreduce jobs that ship with hbase both
> >>> in main and under test.
> >>>
> >>> Looking at your program, you are all Text.  The above complaint is
> >>> about wanting a Put or Delete.  Can you change what you produce so
> >>> Put/Delete rather than Text?
> >>>
> >>> St.Ack
> >>>
> >>
> >>
>
>
>

Re: java.io.IOException: Pass a Delete or a Put

Posted by Doug Meil <do...@explorysmedical.com>.
Did you compare your example to this...

http://hbase.apache.org/book.html#mapreduce.example
7.2.2. HBase MapReduce Read/Write Example


?




On 9/12/12 1:02 PM, "Jothikumar Ekanath" <kb...@gmail.com> wrote:

>Any help on this one please.
>
>On Tue, Sep 11, 2012 at 11:19 AM, Jothikumar Ekanath
><kb...@gmail.com>wrote:
>
>> Hi Stack,
>>                 Thanks for the reply. I looked at the code and i am
>>having
>> a very basic confusion on how to use it correctly.  The code i wrote
>> earlier has the following input and output types and i want it that way
>>
>> After looking at the sources and examples, i modified my reducer (given
>> below), the mapper and job configuration are still the same. Still i see
>> the same error. Am i doing something wrong?
>>
>>
>>  DailySumMapper extends TableMapper<Text, Text>
>>     KEYOUT = Text
>>     VALUEOUT = Text
>>
>>  DailySumReducer extends TableReducer<Text, Text,
>>ImmutableBytesWritable>
>>
>>     KEYIN = Text
>>     VALUEIN = Text
>>     KEYOUT = ImmutableBytesWritable
>>     VALUEOUT = must be always Put or Delete when we extend TableReducer,
>> So we are not specifying that.
>>
>> Code
>>  public static class DailySumReducer extends TableReducer<Text, Text,
>> ImmutableBytesWritable> {
>>
>>         private int count = 0;
>>         protected void reduce(Text key, Iterable<Text>
>> values,Reducer.Context context) throws IOException,
>>InterruptedException{
>>
>>             long inbound = 0l;
>>             long outbound = 0l;
>>             for (Text val : values) {
>>                 String text = val.toString();
>>                 int index = text.indexOf("-");
>>                 String in = text.substring(0,index);
>>                 String out = text.substring(index+1,text.length());
>>                 inbound = inbound + Long.parseLong(in);
>>                 outbound = outbound + Long.parseLong(out);
>>             }
>>             ByteBuffer data = ByteBuffer.wrap(new byte[16]);
>>             data.putLong(inbound);
>>             data.putLong(outbound);
>>             Put put = new Put(Bytes.toBytes(key.toString()+20120804));
>>             put.add(Bytes.toBytes("t"),
>>Bytes.toBytes("s"),data.array());
>>             context.setStatus("Emitting Put " + count++);
>>             ImmutableBytesWritable ibw = new
>> ImmutableBytesWritable(Bytes.toBytes(key.toString()));
>>             context.write(ibw,put);
>>
>>         }
>>     }
>>
>> On Tue, Sep 11, 2012 at 10:38 AM, Stack <st...@duboce.net> wrote:
>>
>>> On Mon, Sep 10, 2012 at 7:06 PM, Jothikumar Ekanath
>>><kb...@gmail.com>
>>> wrote:
>>> > Hi,
>>> >        Getting this error while using hbase as a sink.
>>> >
>>> >
>>> > Error
>>> > java.io.IOException: Pass a Delete or a Put
>>>
>>> Would suggest you study the mapreduce jobs that ship with hbase both
>>> in main and under test.
>>>
>>> Looking at your program, you are all Text.  The above complaint is
>>> about wanting a Put or Delete.  Can you change what you produce so
>>> Put/Delete rather than Text?
>>>
>>> St.Ack
>>>
>>
>>



Re: java.io.IOException: Pass a Delete or a Put

Posted by Jothikumar Ekanath <kb...@gmail.com>.
Any help on this one please.

On Tue, Sep 11, 2012 at 11:19 AM, Jothikumar Ekanath <kb...@gmail.com>wrote:

> Hi Stack,
>                 Thanks for the reply. I looked at the code and i am having
> a very basic confusion on how to use it correctly.  The code i wrote
> earlier has the following input and output types and i want it that way
>
> After looking at the sources and examples, i modified my reducer (given
> below), the mapper and job configuration are still the same. Still i see
> the same error. Am i doing something wrong?
>
>
>  DailySumMapper extends TableMapper<Text, Text>
>     KEYOUT = Text
>     VALUEOUT = Text
>
>  DailySumReducer extends TableReducer<Text, Text, ImmutableBytesWritable>
>
>     KEYIN = Text
>     VALUEIN = Text
>     KEYOUT = ImmutableBytesWritable
>     VALUEOUT = must be always Put or Delete when we extend TableReducer,
> So we are not specifying that.
>
> Code
>  public static class DailySumReducer extends TableReducer<Text, Text,
> ImmutableBytesWritable> {
>
>         private int count = 0;
>         protected void reduce(Text key, Iterable<Text>
> values,Reducer.Context context) throws IOException, InterruptedException{
>
>             long inbound = 0l;
>             long outbound = 0l;
>             for (Text val : values) {
>                 String text = val.toString();
>                 int index = text.indexOf("-");
>                 String in = text.substring(0,index);
>                 String out = text.substring(index+1,text.length());
>                 inbound = inbound + Long.parseLong(in);
>                 outbound = outbound + Long.parseLong(out);
>             }
>             ByteBuffer data = ByteBuffer.wrap(new byte[16]);
>             data.putLong(inbound);
>             data.putLong(outbound);
>             Put put = new Put(Bytes.toBytes(key.toString()+20120804));
>             put.add(Bytes.toBytes("t"), Bytes.toBytes("s"),data.array());
>             context.setStatus("Emitting Put " + count++);
>             ImmutableBytesWritable ibw = new
> ImmutableBytesWritable(Bytes.toBytes(key.toString()));
>             context.write(ibw,put);
>
>         }
>     }
>
> On Tue, Sep 11, 2012 at 10:38 AM, Stack <st...@duboce.net> wrote:
>
>> On Mon, Sep 10, 2012 at 7:06 PM, Jothikumar Ekanath <kb...@gmail.com>
>> wrote:
>> > Hi,
>> >        Getting this error while using hbase as a sink.
>> >
>> >
>> > Error
>> > java.io.IOException: Pass a Delete or a Put
>>
>> Would suggest you study the mapreduce jobs that ship with hbase both
>> in main and under test.
>>
>> Looking at your program, you are all Text.  The above complaint is
>> about wanting a Put or Delete.  Can you change what you produce so
>> Put/Delete rather than Text?
>>
>> St.Ack
>>
>
>

Re: java.io.IOException: Pass a Delete or a Put

Posted by Jothikumar Ekanath <kb...@gmail.com>.
Hi Stack,
                Thanks for the reply. I looked at the code and i am having
a very basic confusion on how to use it correctly.  The code i wrote
earlier has the following input and output types and i want it that way

After looking at the sources and examples, i modified my reducer (given
below), the mapper and job configuration are still the same. Still i see
the same error. Am i doing something wrong?

 DailySumMapper extends TableMapper<Text, Text>
    KEYOUT = Text
    VALUEOUT = Text

 DailySumReducer extends TableReducer<Text, Text, ImmutableBytesWritable>

    KEYIN = Text
    VALUEIN = Text
    KEYOUT = ImmutableBytesWritable
    VALUEOUT = must be always Put or Delete when we extend TableReducer, So
we are not specifying that.

Code
 public static class DailySumReducer extends TableReducer<Text, Text,
ImmutableBytesWritable> {
        private int count = 0;
        protected void reduce(Text key, Iterable<Text>
values,Reducer.Context context) throws IOException, InterruptedException{
            long inbound = 0l;
            long outbound = 0l;
            for (Text val : values) {
                String text = val.toString();
                int index = text.indexOf("-");
                String in = text.substring(0,index);
                String out = text.substring(index+1,text.length());
                inbound = inbound + Long.parseLong(in);
                outbound = outbound + Long.parseLong(out);
            }
            ByteBuffer data = ByteBuffer.wrap(new byte[16]);
            data.putLong(inbound);
            data.putLong(outbound);
            Put put = new Put(Bytes.toBytes(key.toString()+20120804));
            put.add(Bytes.toBytes("t"), Bytes.toBytes("s"),data.array());
            context.setStatus("Emitting Put " + count++);
            ImmutableBytesWritable ibw = new
ImmutableBytesWritable(Bytes.toBytes(key.toString()));
            context.write(ibw,put);
        }
    }

On Tue, Sep 11, 2012 at 10:38 AM, Stack <st...@duboce.net> wrote:

> On Mon, Sep 10, 2012 at 7:06 PM, Jothikumar Ekanath <kb...@gmail.com>
> wrote:
> > Hi,
> >        Getting this error while using hbase as a sink.
> >
> >
> > Error
> > java.io.IOException: Pass a Delete or a Put
>
> Would suggest you study the mapreduce jobs that ship with hbase both
> in main and under test.
>
> Looking at your program, you are all Text.  The above complaint is
> about wanting a Put or Delete.  Can you change what you produce so
> Put/Delete rather than Text?
>
> St.Ack
>

Re: java.io.IOException: Pass a Delete or a Put

Posted by Stack <st...@duboce.net>.
On Mon, Sep 10, 2012 at 7:06 PM, Jothikumar Ekanath <kb...@gmail.com> wrote:
> Hi,
>        Getting this error while using hbase as a sink.
>
>
> Error
> java.io.IOException: Pass a Delete or a Put

Would suggest you study the mapreduce jobs that ship with hbase both
in main and under test.

Looking at your program, you are all Text.  The above complaint is
about wanting a Put or Delete.  Can you change what you produce so
Put/Delete rather than Text?

St.Ack

Re: java.io.IOException: Pass a Delete or a Put

Posted by Jothikumar Ekanath <kb...@gmail.com>.
Hi,

I am kind of stuck on this one, I read all the other similar issues and
coded based on that. But still i get this error.

Any help or clue will help me moving forward.

Thanks




On Mon, Sep 10, 2012 at 7:06 PM, Jothikumar Ekanath <kb...@gmail.com>wrote:

> Hi,
>        Getting this error while using hbase as a sink.
>
>
> Error
> java.io.IOException: Pass a Delete or a Put
>         at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125)
>         at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84)
>         at
> org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:587)
>         at
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
>         at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156)
>         at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
>         at
> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
>         at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417)
>         at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>         at org.apache.hadoop.mapred.Child.main(Child.java:249)
>
>
>
>  Below is my code
> Using the following version
>
> Hbase = 0.94
> Hadoop - 1.0.3
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.hbase.HBaseConfiguration;
> import org.apache.hadoop.hbase.KeyValue;
> import org.apache.hadoop.hbase.client.Put;
> import org.apache.hadoop.hbase.client.Result;
> import org.apache.hadoop.hbase.client.Scan;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
> import org.apache.hadoop.hbase.mapreduce.TableMapper;
> import org.apache.hadoop.hbase.mapreduce.TableReducer;
> import org.apache.hadoop.hbase.util.Bytes;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.*;
>
> import java.io.IOException;
> import java.nio.ByteBuffer;
> import java.util.ArrayList;
> import java.util.List;
>
> public class DailyAggMapReduce {
>
>     public static void main(String args[]) throws Exception {
>         Configuration config = HBaseConfiguration.create();
>         Job job = new Job(config, "DailyAverageMR");
>         job.setJarByClass(DailyAggMapReduce.class);
>         Scan scan = new Scan();
>         // 1 is the default in Scan, which will be bad for MapReduce jobs
>         scan.setCaching(500);
>         // don't set to true for MR jobs
>         scan.setCacheBlocks(false);
>
>         TableMapReduceUtil.initTableMapperJob(
>                 "HTASDB",        // input table
>                 scan,               // Scan instance to control CF and
> attribute selection
>                 DailySumMapper.class,     // mapper class
>                 Text.class,         // mapper output key
>                 Text.class,  // mapper output value
>                 job);
>
>         TableMapReduceUtil.initTableReducerJob(
>                 "DA",        // output table
>                 DailySumReducer.class,    // reducer class
>                 job);
>
>         //job.setOutputValueClass(Put.class);
>         job.setNumReduceTasks(1);   // at least one, adjust as required
>
>         boolean b = job.waitForCompletion(true);
>         if (!b) {
>             throw new IOException("error with job!");
>         }
>
>     }
>
>
>     public static class DailySumMapper extends TableMapper<Text, Text> {
>
>         public void map(ImmutableBytesWritable row, Result value,
> Mapper.Context context) throws IOException, InterruptedException {
>             List<String> key = getRowKey(row.get());
>             Text rowKey = new Text(key.get(0));
>             int time = Integer.parseInt(key.get(1));
>             //limiting the time for one day (Aug 04 2012) -- Testing, Not
> a good way
>             if (time <= 1344146400) {
>                 List<KeyValue> data = value.list();
>                 long inbound = 0l;
>                 long outbound = 0l;
>                 for (KeyValue kv : data) {
>                     List<Long> values = getValues(kv.getValue());
>                     if (values.get(0) != -1) {
>                         inbound = inbound + values.get(0);
>                     }
>                     if (values.get(1) != -1) {
>                         outbound = outbound + values.get(1);
>                     }
>                 }
>                 context.write(rowKey, new Text(String.valueOf(inbound) +
> "-" + String.valueOf(outbound)));
>             }
>         }
>
>         private static List<Long> getValues(byte[] data) {
>             List<Long> values = new ArrayList<Long>();
>             ByteBuffer buffer = ByteBuffer.wrap(data);
>             values.add(buffer.getLong());
>             values.add(buffer.getLong());
>             return values;
>         }
>
>         private static List<String> getRowKey(byte[] key) {
>             List<String> keys = new ArrayList<String>();
>             ByteBuffer buffer = ByteBuffer.wrap(key);
>             StringBuilder sb = new StringBuilder();
>             sb.append(buffer.getInt());
>             sb.append("-");
>             if (key.length == 13) {
>                 sb.append(buffer.getInt());
>                 sb.append("-");
>             }
>             sb.append(buffer.get());
>             keys.add(sb.toString());
>             keys.add(String.valueOf(buffer.getInt()));
>             return keys;
>         }
>     }
>
>     public static class DailySumReducer extends TableReducer<Text, Text,
> Put> {
>         private int count = 0;
>         public void reduce(Text key, Iterable<Text> values,
> Reducer.Context context) throws IOException, InterruptedException {
>             long inbound = 0l;
>             long outbound = 0l;
>             for (Text val : values) {
>                 String text = val.toString();
>                 int index = text.indexOf("-");
>                 String in = text.substring(0,index);
>                 String out = text.substring(index+1,text.length());
>                 inbound = inbound + Long.parseLong(in);
>                 outbound = outbound + Long.parseLong(out);
>             }
>             ByteBuffer data = ByteBuffer.wrap(new byte[16]);
>             data.putLong(inbound);
>             data.putLong(outbound);
>             Put put = new Put(Bytes.toBytes(key.toString()+20120804));
>             put.add(Bytes.toBytes("t"), Bytes.toBytes("s"),data.array());
>             context.setStatus("Emitting Put " + count++);
>             context.write(key, put);
>         }
>     }
> }
>
> Thanks
> Jothikumar
>
>