You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@accumulo.apache.org by Mike Hugo <mi...@piragua.com> on 2013/01/16 18:11:30 UTC

Programtically invoking a Map/Reduce job

I'm writing a client program that uses the BatchWriter and BatchScanner for
inserting and querying data, but occasionally it also needs to be able to
kick of a Map/Reduce job on a remote accumulo cluster.  The Map/Reduce
examples that ship with Accumulo look like they are meant to be invoked via
the command line.  Does anyone have an example of how to kick something off
via a java client running on a separate server?  Any best practices to
share?
Thanks,
Mike

Re: Programtically invoking a Map/Reduce job

Posted by Mike Hugo <mi...@piragua.com>.
Spot on - thanks Billie, that did the trick!




On Thu, Jan 17, 2013 at 1:57 PM, Billie Rinaldi <bi...@apache.org> wrote:

> On Thu, Jan 17, 2013 at 11:16 AM, Mike Hugo <mi...@piragua.com> wrote:
>
>> Thanks Billie!
>>
>> Setting "mapred.job.tracker" and "fs.default.name" in the conf has
>> gotten me further.
>>
>>          job.getConfiguration().set("mapred.job.tracker",
>> "server_name_here:8021");
>>         job.getConfiguration().set("fs.default.name",
>> "hdfs://server_name_here:8020");
>>
>> What's interesting now is that the job can't find Accumulo classes - when
>> I run the job now, I get
>>
>> 2013-01-17 12:59:25,278 [main] INFO  mapred.JobClient  - Task Id :
>> attempt_201301171102_0012_m_000000_1, Status : FAILED
>> java.lang.RuntimeException: java.lang.ClassNotFoundException:
>> org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat
>>
>> Is there a way to inform the job (via the Job API, on a separate machine
>> not running hadoop) about extra libs to include on the classpath of the job?
>>
>
> You normally inform a job about jars it needs by specifying "-libjars
> comma,separated,jar,list" on the command line.  In this case, you need to
> put those two strings "-libjars" and "jar,list" in the String[] args passed
> to ToolRunner.run:
> ToolRunner.run(CachedConfiguration.getInstance(), new ...(), args)
>
> The accumulo-core jar probably isn't the only one you'll need.
>
> Billie
>
>
>>
>> Thanks
>>
>> Mike
>>
>>
>>
>> On Wed, Jan 16, 2013 at 3:11 PM, Billie Rinaldi <bi...@apache.org>wrote:
>>
>>> Your job is running in "local" mode (Running job: job_local_0001).  This
>>> basically means that the hadoop configuration is not present on the
>>> classpath of the java client kicking off the job.  If you weren't planning
>>> to have the hadoop config on that machine, you might be able to get away
>>> with setting "mapred.job.tracker" and probably also "fs.default.name"
>>> on the Configuration object.
>>>
>>> Billie
>>>
>>>
>>>
>>> On Wed, Jan 16, 2013 at 12:07 PM, Mike Hugo <mi...@piragua.com> wrote:
>>>
>>>> Cool, thanks for the feedback John, the examples have been helpful in
>>>> getting up and running!
>>>>
>>>> Perhaps I'm not doing something quite right.  When I jar up my jobs and
>>>> deploy the jar to the server and run it via the tool.sh command on the
>>>> cluster, I see the job running in the jobtracker (servername:50030) and it
>>>> runs as I would expect.
>>>>
>>>> 13/01/16 14:39:53 INFO mapred.JobClient: Running job:
>>>> job_201301161326_0006
>>>> 13/01/16 14:39:54 INFO mapred.JobClient:  map 0% reduce 0%
>>>> 13/01/16 14:41:29 INFO mapred.JobClient:  map 50% reduce 0%
>>>> 13/01/16 14:41:35 INFO mapred.JobClient:  map 100% reduce 0%
>>>> 13/01/16 14:41:40 INFO mapred.JobClient: Job complete:
>>>> job_201301161326_0006
>>>> 13/01/16 14:41:40 INFO mapred.JobClient: Counters: 18
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:   Job Counters
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=180309
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Total time spent by all
>>>> reduces waiting after reserving slots (ms)=0
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Total time spent by all
>>>> maps waiting after reserving slots (ms)=0
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Rack-local map tasks=2
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Launched map tasks=2
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:   File Output Format Counters
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Bytes Written=0
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:   FileSystemCounters
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     HDFS_BYTES_READ=248
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=60214
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:   File Input Format Counters
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Bytes Read=0
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:   Map-Reduce Framework
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Map input records=1036434
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Physical memory (bytes)
>>>> snapshot=373760000
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Spilled Records=0
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     CPU time spent (ms)=24410
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Total committed heap usage
>>>> (bytes)=168394752
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Virtual memory (bytes)
>>>> snapshot=2124627968
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Map output records=2462684
>>>> 13/01/16 14:41:40 INFO mapred.JobClient:     SPLIT_RAW_BYTES=248
>>>>
>>>>
>>>>
>>>> When I kick off a job via a java client running on a different host,
>>>> the job seems to run (I can see things being scanned and ingested) but I
>>>> don't see anything via the jobtracker UI on the server.  Is that normal?
>>>>  Or do I have something mis-configured?
>>>>
>>>>
>>>>
>>>> Here's how I'm starting things from the client:
>>>>
>>>>     @Override
>>>>     public int run(String[] strings) throws Exception {
>>>>         Job job = new Job(getConf(), getClass().getSimpleName());
>>>>         job.setJarByClass(getClass());
>>>>         job.setMapperClass(MyMapper.class);
>>>>
>>>>         job.setInputFormatClass(AccumuloRowInputFormat.class);
>>>>
>>>> AccumuloRowInputFormat.setZooKeeperInstance(job.getConfiguration(),
>>>> instanceName, zookeepers);
>>>>
>>>>         AccumuloRowInputFormat.setInputInfo(job.getConfiguration(),
>>>>                 username,
>>>>                 password.getBytes(),
>>>>                 "...",
>>>>                 new Authorizations());
>>>>
>>>>         job.setNumReduceTasks(0);
>>>>
>>>>         job.setOutputFormatClass(AccumuloOutputFormat.class);
>>>>         job.setOutputKeyClass(Key.class);
>>>>         job.setOutputValueClass(Mutation.class);
>>>>
>>>>         boolean createTables = true;
>>>>         String defaultTable = "...";
>>>>         AccumuloOutputFormat.setOutputInfo(job.getConfiguration(),
>>>>                 username,
>>>>                 password.getBytes(),
>>>>                 createTables,
>>>>                 defaultTable);
>>>>
>>>>
>>>> AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(),
>>>> instanceName, zookeepers);
>>>>
>>>>         job.waitForCompletion(true);
>>>>
>>>>         return job.isSuccessful() ? 0 : 1;
>>>>     }
>>>>
>>>>     public static void main(String args[]) throws Exception {
>>>>         int res = ToolRunner.run(CachedConfiguration.getInstance(), new
>>>> ...(), args);
>>>>         System.exit(res);
>>>>     }
>>>>
>>>>
>>>>
>>>> Here's the output when I run it via the client application:
>>>>
>>>>
>>>> 2013-01-16 13:55:57,645 [main-SendThread()] INFO  zookeeper.ClientCnxn
>>>>  - Opening socket connection to server accumulo/10.1.10.160:2181
>>>> 2013-01-16 13:55:57,660 [main-SendThread(accumulo:2181)] INFO
>>>>  zookeeper.ClientCnxn  - Socket connection established to accumulo/
>>>> 10.1.10.160:2181, initiating session
>>>> 2013-01-16 13:55:57,671 [main-SendThread(accumulo:2181)] INFO
>>>>  zookeeper.ClientCnxn  - Session establishment complete on server accumulo/
>>>> 10.1.10.160:2181, sessionid = 0x13c449cfe010434, negotiated timeout =
>>>> 30000
>>>> 2013-01-16 13:55:58,379 [main] INFO  mapred.JobClient  - Running job:
>>>> job_local_0001
>>>> 2013-01-16 13:55:58,447 [Thread-16] INFO  mapred.Task  -  Using
>>>> ResourceCalculatorPlugin : null
>>>> 2013-01-16 13:55:59,383 [main] INFO  mapred.JobClient  -  map 0% reduce
>>>> 0%
>>>> 2013-01-16 13:56:04,458 [communication thread] INFO
>>>>  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:07,459 [communication thread] INFO
>>>>  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:10,461 [communication thread] INFO
>>>>  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:13,462 [communication thread] INFO
>>>>  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:16,463 [communication thread] INFO
>>>>  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:19,465 [communication thread] INFO
>>>>  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:21,783 [Thread-16] INFO  mapred.Task  -
>>>> Task:attempt_local_0001_m_000000_0 is done. And is in the process of
>>>> commiting
>>>> 2013-01-16 13:56:21,783 [Thread-16] INFO  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:21,784 [Thread-16] INFO  mapred.Task  - Task
>>>> 'attempt_local_0001_m_000000_0' done.
>>>> 2013-01-16 13:56:21,786 [Thread-16] INFO  mapred.Task  -  Using
>>>> ResourceCalculatorPlugin : null
>>>> 2013-01-16 13:56:22,423 [main] INFO  mapred.JobClient  -  map 100%
>>>> reduce 0%
>>>> 2013-01-16 13:56:27,788 [communication thread] INFO
>>>>  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:28,440 [main] INFO  mapred.JobClient  -  map 50%
>>>> reduce 0%
>>>> 2013-01-16 13:56:30,790 [communication thread] INFO
>>>>  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:33,791 [communication thread] INFO
>>>>  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:36,792 [communication thread] INFO
>>>>  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:39,793 [communication thread] INFO
>>>>  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:42,794 [communication thread] INFO
>>>>  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:45,779 [Thread-16] INFO  mapred.Task  -
>>>> Task:attempt_local_0001_m_000001_0 is done. And is in the process of
>>>> commiting
>>>> 2013-01-16 13:56:45,780 [Thread-16] INFO  mapred.LocalJobRunner  -
>>>> 2013-01-16 13:56:45,781 [Thread-16] INFO  mapred.Task  - Task
>>>> 'attempt_local_0001_m_000001_0' done.
>>>> 2013-01-16 13:56:45,782 [Thread-16] WARN  mapred.FileOutputCommitter  -
>>>> Output path is null in cleanup
>>>> 2013-01-16 13:56:46,462 [main] INFO  mapred.JobClient  -  map 100%
>>>> reduce 0%
>>>> 2013-01-16 13:56:46,462 [main] INFO  mapred.JobClient  - Job complete:
>>>> job_local_0001
>>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  - Counters: 7
>>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
>>>> FileSystemCounters
>>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
>>>> FILE_BYTES_READ=1257
>>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
>>>> FILE_BYTES_WRITTEN=106136
>>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -   Map-Reduce
>>>> Framework
>>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Map input
>>>> records=1036434
>>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Spilled
>>>> Records=0
>>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Total
>>>> committed heap usage (bytes)=259915776
>>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Map output
>>>> records=2462684
>>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
>>>> SPLIT_RAW_BYTES=240
>>>>
>>>>
>>>>
>>>> On Wed, Jan 16, 2013 at 11:20 AM, John Vines <vi...@apache.org> wrote:
>>>>
>>>>> The code examples we have scripted simply do the necessary setup for
>>>>> creating a mapreduce job and kicking it off. If you check out the code for
>>>>> them in
>>>>> src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/
>>>>> you can see what we're doing in Java to kick off jobs.
>>>>>
>>>>> The short explanation is, just like any other MapReduce job, we're
>>>>> setting up a Job, configuring the AccumuloInput and/or OutputFormats, and
>>>>> sending them off like any other MapReduce job.
>>>>>
>>>>> John
>>>>>
>>>>>
>>>>> On Wed, Jan 16, 2013 at 12:11 PM, Mike Hugo <mi...@piragua.com> wrote:
>>>>>
>>>>>> I'm writing a client program that uses the BatchWriter and
>>>>>> BatchScanner for inserting and querying data, but occasionally it also
>>>>>> needs to be able to kick of a Map/Reduce job on a remote accumulo cluster.
>>>>>>  The Map/Reduce examples that ship with Accumulo look like they are meant
>>>>>> to be invoked via the command line.  Does anyone have an example of how to
>>>>>> kick something off via a java client running on a separate server?  Any
>>>>>> best practices to share?
>>>>>> Thanks,
>>>>>> Mike
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Programtically invoking a Map/Reduce job

Posted by Billie Rinaldi <bi...@apache.org>.
On Thu, Jan 17, 2013 at 11:16 AM, Mike Hugo <mi...@piragua.com> wrote:

> Thanks Billie!
>
> Setting "mapred.job.tracker" and "fs.default.name" in the conf has gotten
> me further.
>
>          job.getConfiguration().set("mapred.job.tracker",
> "server_name_here:8021");
>         job.getConfiguration().set("fs.default.name",
> "hdfs://server_name_here:8020");
>
> What's interesting now is that the job can't find Accumulo classes - when
> I run the job now, I get
>
> 2013-01-17 12:59:25,278 [main] INFO  mapred.JobClient  - Task Id :
> attempt_201301171102_0012_m_000000_1, Status : FAILED
> java.lang.RuntimeException: java.lang.ClassNotFoundException:
> org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat
>
> Is there a way to inform the job (via the Job API, on a separate machine
> not running hadoop) about extra libs to include on the classpath of the job?
>

You normally inform a job about jars it needs by specifying "-libjars
comma,separated,jar,list" on the command line.  In this case, you need to
put those two strings "-libjars" and "jar,list" in the String[] args passed
to ToolRunner.run:
ToolRunner.run(CachedConfiguration.getInstance(), new ...(), args)

The accumulo-core jar probably isn't the only one you'll need.

Billie


>
> Thanks
>
> Mike
>
>
>
> On Wed, Jan 16, 2013 at 3:11 PM, Billie Rinaldi <bi...@apache.org> wrote:
>
>> Your job is running in "local" mode (Running job: job_local_0001).  This
>> basically means that the hadoop configuration is not present on the
>> classpath of the java client kicking off the job.  If you weren't planning
>> to have the hadoop config on that machine, you might be able to get away
>> with setting "mapred.job.tracker" and probably also "fs.default.name" on
>> the Configuration object.
>>
>> Billie
>>
>>
>>
>> On Wed, Jan 16, 2013 at 12:07 PM, Mike Hugo <mi...@piragua.com> wrote:
>>
>>> Cool, thanks for the feedback John, the examples have been helpful in
>>> getting up and running!
>>>
>>> Perhaps I'm not doing something quite right.  When I jar up my jobs and
>>> deploy the jar to the server and run it via the tool.sh command on the
>>> cluster, I see the job running in the jobtracker (servername:50030) and it
>>> runs as I would expect.
>>>
>>> 13/01/16 14:39:53 INFO mapred.JobClient: Running job:
>>> job_201301161326_0006
>>> 13/01/16 14:39:54 INFO mapred.JobClient:  map 0% reduce 0%
>>> 13/01/16 14:41:29 INFO mapred.JobClient:  map 50% reduce 0%
>>> 13/01/16 14:41:35 INFO mapred.JobClient:  map 100% reduce 0%
>>> 13/01/16 14:41:40 INFO mapred.JobClient: Job complete:
>>> job_201301161326_0006
>>> 13/01/16 14:41:40 INFO mapred.JobClient: Counters: 18
>>> 13/01/16 14:41:40 INFO mapred.JobClient:   Job Counters
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=180309
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Total time spent by all
>>> reduces waiting after reserving slots (ms)=0
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Total time spent by all
>>> maps waiting after reserving slots (ms)=0
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Rack-local map tasks=2
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Launched map tasks=2
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
>>> 13/01/16 14:41:40 INFO mapred.JobClient:   File Output Format Counters
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Bytes Written=0
>>> 13/01/16 14:41:40 INFO mapred.JobClient:   FileSystemCounters
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     HDFS_BYTES_READ=248
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=60214
>>> 13/01/16 14:41:40 INFO mapred.JobClient:   File Input Format Counters
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Bytes Read=0
>>> 13/01/16 14:41:40 INFO mapred.JobClient:   Map-Reduce Framework
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Map input records=1036434
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Physical memory (bytes)
>>> snapshot=373760000
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Spilled Records=0
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     CPU time spent (ms)=24410
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Total committed heap usage
>>> (bytes)=168394752
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Virtual memory (bytes)
>>> snapshot=2124627968
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     Map output records=2462684
>>> 13/01/16 14:41:40 INFO mapred.JobClient:     SPLIT_RAW_BYTES=248
>>>
>>>
>>>
>>> When I kick off a job via a java client running on a different host, the
>>> job seems to run (I can see things being scanned and ingested) but I don't
>>> see anything via the jobtracker UI on the server.  Is that normal?  Or do I
>>> have something mis-configured?
>>>
>>>
>>>
>>> Here's how I'm starting things from the client:
>>>
>>>     @Override
>>>     public int run(String[] strings) throws Exception {
>>>         Job job = new Job(getConf(), getClass().getSimpleName());
>>>         job.setJarByClass(getClass());
>>>         job.setMapperClass(MyMapper.class);
>>>
>>>         job.setInputFormatClass(AccumuloRowInputFormat.class);
>>>
>>> AccumuloRowInputFormat.setZooKeeperInstance(job.getConfiguration(),
>>> instanceName, zookeepers);
>>>
>>>         AccumuloRowInputFormat.setInputInfo(job.getConfiguration(),
>>>                 username,
>>>                 password.getBytes(),
>>>                 "...",
>>>                 new Authorizations());
>>>
>>>         job.setNumReduceTasks(0);
>>>
>>>         job.setOutputFormatClass(AccumuloOutputFormat.class);
>>>         job.setOutputKeyClass(Key.class);
>>>         job.setOutputValueClass(Mutation.class);
>>>
>>>         boolean createTables = true;
>>>         String defaultTable = "...";
>>>         AccumuloOutputFormat.setOutputInfo(job.getConfiguration(),
>>>                 username,
>>>                 password.getBytes(),
>>>                 createTables,
>>>                 defaultTable);
>>>
>>>
>>> AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(),
>>> instanceName, zookeepers);
>>>
>>>         job.waitForCompletion(true);
>>>
>>>         return job.isSuccessful() ? 0 : 1;
>>>     }
>>>
>>>     public static void main(String args[]) throws Exception {
>>>         int res = ToolRunner.run(CachedConfiguration.getInstance(), new
>>> ...(), args);
>>>         System.exit(res);
>>>     }
>>>
>>>
>>>
>>> Here's the output when I run it via the client application:
>>>
>>>
>>> 2013-01-16 13:55:57,645 [main-SendThread()] INFO  zookeeper.ClientCnxn
>>>  - Opening socket connection to server accumulo/10.1.10.160:2181
>>> 2013-01-16 13:55:57,660 [main-SendThread(accumulo:2181)] INFO
>>>  zookeeper.ClientCnxn  - Socket connection established to accumulo/
>>> 10.1.10.160:2181, initiating session
>>> 2013-01-16 13:55:57,671 [main-SendThread(accumulo:2181)] INFO
>>>  zookeeper.ClientCnxn  - Session establishment complete on server accumulo/
>>> 10.1.10.160:2181, sessionid = 0x13c449cfe010434, negotiated timeout =
>>> 30000
>>> 2013-01-16 13:55:58,379 [main] INFO  mapred.JobClient  - Running job:
>>> job_local_0001
>>> 2013-01-16 13:55:58,447 [Thread-16] INFO  mapred.Task  -  Using
>>> ResourceCalculatorPlugin : null
>>> 2013-01-16 13:55:59,383 [main] INFO  mapred.JobClient  -  map 0% reduce
>>> 0%
>>> 2013-01-16 13:56:04,458 [communication thread] INFO
>>>  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:07,459 [communication thread] INFO
>>>  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:10,461 [communication thread] INFO
>>>  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:13,462 [communication thread] INFO
>>>  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:16,463 [communication thread] INFO
>>>  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:19,465 [communication thread] INFO
>>>  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:21,783 [Thread-16] INFO  mapred.Task  -
>>> Task:attempt_local_0001_m_000000_0 is done. And is in the process of
>>> commiting
>>> 2013-01-16 13:56:21,783 [Thread-16] INFO  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:21,784 [Thread-16] INFO  mapred.Task  - Task
>>> 'attempt_local_0001_m_000000_0' done.
>>> 2013-01-16 13:56:21,786 [Thread-16] INFO  mapred.Task  -  Using
>>> ResourceCalculatorPlugin : null
>>> 2013-01-16 13:56:22,423 [main] INFO  mapred.JobClient  -  map 100%
>>> reduce 0%
>>> 2013-01-16 13:56:27,788 [communication thread] INFO
>>>  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:28,440 [main] INFO  mapred.JobClient  -  map 50% reduce
>>> 0%
>>> 2013-01-16 13:56:30,790 [communication thread] INFO
>>>  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:33,791 [communication thread] INFO
>>>  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:36,792 [communication thread] INFO
>>>  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:39,793 [communication thread] INFO
>>>  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:42,794 [communication thread] INFO
>>>  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:45,779 [Thread-16] INFO  mapred.Task  -
>>> Task:attempt_local_0001_m_000001_0 is done. And is in the process of
>>> commiting
>>> 2013-01-16 13:56:45,780 [Thread-16] INFO  mapred.LocalJobRunner  -
>>> 2013-01-16 13:56:45,781 [Thread-16] INFO  mapred.Task  - Task
>>> 'attempt_local_0001_m_000001_0' done.
>>> 2013-01-16 13:56:45,782 [Thread-16] WARN  mapred.FileOutputCommitter  -
>>> Output path is null in cleanup
>>> 2013-01-16 13:56:46,462 [main] INFO  mapred.JobClient  -  map 100%
>>> reduce 0%
>>> 2013-01-16 13:56:46,462 [main] INFO  mapred.JobClient  - Job complete:
>>> job_local_0001
>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  - Counters: 7
>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
>>> FileSystemCounters
>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
>>> FILE_BYTES_READ=1257
>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
>>> FILE_BYTES_WRITTEN=106136
>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -   Map-Reduce
>>> Framework
>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Map input
>>> records=1036434
>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Spilled
>>> Records=0
>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Total
>>> committed heap usage (bytes)=259915776
>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Map output
>>> records=2462684
>>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
>>> SPLIT_RAW_BYTES=240
>>>
>>>
>>>
>>> On Wed, Jan 16, 2013 at 11:20 AM, John Vines <vi...@apache.org> wrote:
>>>
>>>> The code examples we have scripted simply do the necessary setup for
>>>> creating a mapreduce job and kicking it off. If you check out the code for
>>>> them in
>>>> src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/
>>>> you can see what we're doing in Java to kick off jobs.
>>>>
>>>> The short explanation is, just like any other MapReduce job, we're
>>>> setting up a Job, configuring the AccumuloInput and/or OutputFormats, and
>>>> sending them off like any other MapReduce job.
>>>>
>>>> John
>>>>
>>>>
>>>> On Wed, Jan 16, 2013 at 12:11 PM, Mike Hugo <mi...@piragua.com> wrote:
>>>>
>>>>> I'm writing a client program that uses the BatchWriter and
>>>>> BatchScanner for inserting and querying data, but occasionally it also
>>>>> needs to be able to kick of a Map/Reduce job on a remote accumulo cluster.
>>>>>  The Map/Reduce examples that ship with Accumulo look like they are meant
>>>>> to be invoked via the command line.  Does anyone have an example of how to
>>>>> kick something off via a java client running on a separate server?  Any
>>>>> best practices to share?
>>>>> Thanks,
>>>>> Mike
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Programtically invoking a Map/Reduce job

Posted by Mike Hugo <mi...@piragua.com>.
Thanks Billie!

Setting "mapred.job.tracker" and "fs.default.name" in the conf has gotten
me further.

        job.getConfiguration().set("mapred.job.tracker",
"server_name_here:8021");
        job.getConfiguration().set("fs.default.name",
"hdfs://server_name_here:8020");

What's interesting now is that the job can't find Accumulo classes - when I
run the job now, I get

2013-01-17 12:59:25,278 [main] INFO  mapred.JobClient  - Task Id :
attempt_201301171102_0012_m_000000_1, Status : FAILED
java.lang.RuntimeException: java.lang.ClassNotFoundException:
org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat

Is there a way to inform the job (via the Job API, on a separate machine
not running hadoop) about extra libs to include on the classpath of the job?

Thanks

Mike



On Wed, Jan 16, 2013 at 3:11 PM, Billie Rinaldi <bi...@apache.org> wrote:

> Your job is running in "local" mode (Running job: job_local_0001).  This
> basically means that the hadoop configuration is not present on the
> classpath of the java client kicking off the job.  If you weren't planning
> to have the hadoop config on that machine, you might be able to get away
> with setting "mapred.job.tracker" and probably also "fs.default.name" on
> the Configuration object.
>
> Billie
>
>
>
> On Wed, Jan 16, 2013 at 12:07 PM, Mike Hugo <mi...@piragua.com> wrote:
>
>> Cool, thanks for the feedback John, the examples have been helpful in
>> getting up and running!
>>
>> Perhaps I'm not doing something quite right.  When I jar up my jobs and
>> deploy the jar to the server and run it via the tool.sh command on the
>> cluster, I see the job running in the jobtracker (servername:50030) and it
>> runs as I would expect.
>>
>> 13/01/16 14:39:53 INFO mapred.JobClient: Running job:
>> job_201301161326_0006
>> 13/01/16 14:39:54 INFO mapred.JobClient:  map 0% reduce 0%
>> 13/01/16 14:41:29 INFO mapred.JobClient:  map 50% reduce 0%
>> 13/01/16 14:41:35 INFO mapred.JobClient:  map 100% reduce 0%
>> 13/01/16 14:41:40 INFO mapred.JobClient: Job complete:
>> job_201301161326_0006
>> 13/01/16 14:41:40 INFO mapred.JobClient: Counters: 18
>> 13/01/16 14:41:40 INFO mapred.JobClient:   Job Counters
>> 13/01/16 14:41:40 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=180309
>> 13/01/16 14:41:40 INFO mapred.JobClient:     Total time spent by all
>> reduces waiting after reserving slots (ms)=0
>> 13/01/16 14:41:40 INFO mapred.JobClient:     Total time spent by all maps
>> waiting after reserving slots (ms)=0
>> 13/01/16 14:41:40 INFO mapred.JobClient:     Rack-local map tasks=2
>> 13/01/16 14:41:40 INFO mapred.JobClient:     Launched map tasks=2
>> 13/01/16 14:41:40 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
>> 13/01/16 14:41:40 INFO mapred.JobClient:   File Output Format Counters
>> 13/01/16 14:41:40 INFO mapred.JobClient:     Bytes Written=0
>> 13/01/16 14:41:40 INFO mapred.JobClient:   FileSystemCounters
>> 13/01/16 14:41:40 INFO mapred.JobClient:     HDFS_BYTES_READ=248
>> 13/01/16 14:41:40 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=60214
>> 13/01/16 14:41:40 INFO mapred.JobClient:   File Input Format Counters
>> 13/01/16 14:41:40 INFO mapred.JobClient:     Bytes Read=0
>> 13/01/16 14:41:40 INFO mapred.JobClient:   Map-Reduce Framework
>> 13/01/16 14:41:40 INFO mapred.JobClient:     Map input records=1036434
>> 13/01/16 14:41:40 INFO mapred.JobClient:     Physical memory (bytes)
>> snapshot=373760000
>> 13/01/16 14:41:40 INFO mapred.JobClient:     Spilled Records=0
>> 13/01/16 14:41:40 INFO mapred.JobClient:     CPU time spent (ms)=24410
>> 13/01/16 14:41:40 INFO mapred.JobClient:     Total committed heap usage
>> (bytes)=168394752
>> 13/01/16 14:41:40 INFO mapred.JobClient:     Virtual memory (bytes)
>> snapshot=2124627968
>> 13/01/16 14:41:40 INFO mapred.JobClient:     Map output records=2462684
>> 13/01/16 14:41:40 INFO mapred.JobClient:     SPLIT_RAW_BYTES=248
>>
>>
>>
>> When I kick off a job via a java client running on a different host, the
>> job seems to run (I can see things being scanned and ingested) but I don't
>> see anything via the jobtracker UI on the server.  Is that normal?  Or do I
>> have something mis-configured?
>>
>>
>>
>> Here's how I'm starting things from the client:
>>
>>     @Override
>>     public int run(String[] strings) throws Exception {
>>         Job job = new Job(getConf(), getClass().getSimpleName());
>>         job.setJarByClass(getClass());
>>         job.setMapperClass(MyMapper.class);
>>
>>         job.setInputFormatClass(AccumuloRowInputFormat.class);
>>
>> AccumuloRowInputFormat.setZooKeeperInstance(job.getConfiguration(),
>> instanceName, zookeepers);
>>
>>         AccumuloRowInputFormat.setInputInfo(job.getConfiguration(),
>>                 username,
>>                 password.getBytes(),
>>                 "...",
>>                 new Authorizations());
>>
>>         job.setNumReduceTasks(0);
>>
>>         job.setOutputFormatClass(AccumuloOutputFormat.class);
>>         job.setOutputKeyClass(Key.class);
>>         job.setOutputValueClass(Mutation.class);
>>
>>         boolean createTables = true;
>>         String defaultTable = "...";
>>         AccumuloOutputFormat.setOutputInfo(job.getConfiguration(),
>>                 username,
>>                 password.getBytes(),
>>                 createTables,
>>                 defaultTable);
>>
>>         AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(),
>> instanceName, zookeepers);
>>
>>         job.waitForCompletion(true);
>>
>>         return job.isSuccessful() ? 0 : 1;
>>     }
>>
>>     public static void main(String args[]) throws Exception {
>>         int res = ToolRunner.run(CachedConfiguration.getInstance(), new
>> ...(), args);
>>         System.exit(res);
>>     }
>>
>>
>>
>> Here's the output when I run it via the client application:
>>
>>
>> 2013-01-16 13:55:57,645 [main-SendThread()] INFO  zookeeper.ClientCnxn  -
>> Opening socket connection to server accumulo/10.1.10.160:2181
>> 2013-01-16 13:55:57,660 [main-SendThread(accumulo:2181)] INFO
>>  zookeeper.ClientCnxn  - Socket connection established to accumulo/
>> 10.1.10.160:2181, initiating session
>> 2013-01-16 13:55:57,671 [main-SendThread(accumulo:2181)] INFO
>>  zookeeper.ClientCnxn  - Session establishment complete on server accumulo/
>> 10.1.10.160:2181, sessionid = 0x13c449cfe010434, negotiated timeout =
>> 30000
>> 2013-01-16 13:55:58,379 [main] INFO  mapred.JobClient  - Running job:
>> job_local_0001
>> 2013-01-16 13:55:58,447 [Thread-16] INFO  mapred.Task  -  Using
>> ResourceCalculatorPlugin : null
>> 2013-01-16 13:55:59,383 [main] INFO  mapred.JobClient  -  map 0% reduce 0%
>> 2013-01-16 13:56:04,458 [communication thread] INFO
>>  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:07,459 [communication thread] INFO
>>  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:10,461 [communication thread] INFO
>>  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:13,462 [communication thread] INFO
>>  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:16,463 [communication thread] INFO
>>  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:19,465 [communication thread] INFO
>>  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:21,783 [Thread-16] INFO  mapred.Task  -
>> Task:attempt_local_0001_m_000000_0 is done. And is in the process of
>> commiting
>> 2013-01-16 13:56:21,783 [Thread-16] INFO  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:21,784 [Thread-16] INFO  mapred.Task  - Task
>> 'attempt_local_0001_m_000000_0' done.
>> 2013-01-16 13:56:21,786 [Thread-16] INFO  mapred.Task  -  Using
>> ResourceCalculatorPlugin : null
>> 2013-01-16 13:56:22,423 [main] INFO  mapred.JobClient  -  map 100% reduce
>> 0%
>> 2013-01-16 13:56:27,788 [communication thread] INFO
>>  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:28,440 [main] INFO  mapred.JobClient  -  map 50% reduce
>> 0%
>> 2013-01-16 13:56:30,790 [communication thread] INFO
>>  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:33,791 [communication thread] INFO
>>  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:36,792 [communication thread] INFO
>>  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:39,793 [communication thread] INFO
>>  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:42,794 [communication thread] INFO
>>  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:45,779 [Thread-16] INFO  mapred.Task  -
>> Task:attempt_local_0001_m_000001_0 is done. And is in the process of
>> commiting
>> 2013-01-16 13:56:45,780 [Thread-16] INFO  mapred.LocalJobRunner  -
>> 2013-01-16 13:56:45,781 [Thread-16] INFO  mapred.Task  - Task
>> 'attempt_local_0001_m_000001_0' done.
>> 2013-01-16 13:56:45,782 [Thread-16] WARN  mapred.FileOutputCommitter  -
>> Output path is null in cleanup
>> 2013-01-16 13:56:46,462 [main] INFO  mapred.JobClient  -  map 100% reduce
>> 0%
>> 2013-01-16 13:56:46,462 [main] INFO  mapred.JobClient  - Job complete:
>> job_local_0001
>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  - Counters: 7
>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
>> FileSystemCounters
>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
>> FILE_BYTES_READ=1257
>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
>> FILE_BYTES_WRITTEN=106136
>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -   Map-Reduce
>> Framework
>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Map input
>> records=1036434
>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Spilled
>> Records=0
>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Total
>> committed heap usage (bytes)=259915776
>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Map output
>> records=2462684
>> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
>> SPLIT_RAW_BYTES=240
>>
>>
>>
>> On Wed, Jan 16, 2013 at 11:20 AM, John Vines <vi...@apache.org> wrote:
>>
>>> The code examples we have scripted simply do the necessary setup for
>>> creating a mapreduce job and kicking it off. If you check out the code for
>>> them in
>>> src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/
>>> you can see what we're doing in Java to kick off jobs.
>>>
>>> The short explanation is, just like any other MapReduce job, we're
>>> setting up a Job, configuring the AccumuloInput and/or OutputFormats, and
>>> sending them off like any other MapReduce job.
>>>
>>> John
>>>
>>>
>>> On Wed, Jan 16, 2013 at 12:11 PM, Mike Hugo <mi...@piragua.com> wrote:
>>>
>>>> I'm writing a client program that uses the BatchWriter and BatchScanner
>>>> for inserting and querying data, but occasionally it also needs to be able
>>>> to kick of a Map/Reduce job on a remote accumulo cluster.  The Map/Reduce
>>>> examples that ship with Accumulo look like they are meant to be invoked via
>>>> the command line.  Does anyone have an example of how to kick something off
>>>> via a java client running on a separate server?  Any best practices to
>>>> share?
>>>> Thanks,
>>>> Mike
>>>>
>>>
>>>
>>
>

Re: Programtically invoking a Map/Reduce job

Posted by Billie Rinaldi <bi...@apache.org>.
Your job is running in "local" mode (Running job: job_local_0001).  This
basically means that the hadoop configuration is not present on the
classpath of the java client kicking off the job.  If you weren't planning
to have the hadoop config on that machine, you might be able to get away
with setting "mapred.job.tracker" and probably also "fs.default.name" on
the Configuration object.

Billie


On Wed, Jan 16, 2013 at 12:07 PM, Mike Hugo <mi...@piragua.com> wrote:

> Cool, thanks for the feedback John, the examples have been helpful in
> getting up and running!
>
> Perhaps I'm not doing something quite right.  When I jar up my jobs and
> deploy the jar to the server and run it via the tool.sh command on the
> cluster, I see the job running in the jobtracker (servername:50030) and it
> runs as I would expect.
>
> 13/01/16 14:39:53 INFO mapred.JobClient: Running job: job_201301161326_0006
> 13/01/16 14:39:54 INFO mapred.JobClient:  map 0% reduce 0%
> 13/01/16 14:41:29 INFO mapred.JobClient:  map 50% reduce 0%
> 13/01/16 14:41:35 INFO mapred.JobClient:  map 100% reduce 0%
> 13/01/16 14:41:40 INFO mapred.JobClient: Job complete:
> job_201301161326_0006
> 13/01/16 14:41:40 INFO mapred.JobClient: Counters: 18
> 13/01/16 14:41:40 INFO mapred.JobClient:   Job Counters
> 13/01/16 14:41:40 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=180309
> 13/01/16 14:41:40 INFO mapred.JobClient:     Total time spent by all
> reduces waiting after reserving slots (ms)=0
> 13/01/16 14:41:40 INFO mapred.JobClient:     Total time spent by all maps
> waiting after reserving slots (ms)=0
> 13/01/16 14:41:40 INFO mapred.JobClient:     Rack-local map tasks=2
> 13/01/16 14:41:40 INFO mapred.JobClient:     Launched map tasks=2
> 13/01/16 14:41:40 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
> 13/01/16 14:41:40 INFO mapred.JobClient:   File Output Format Counters
> 13/01/16 14:41:40 INFO mapred.JobClient:     Bytes Written=0
> 13/01/16 14:41:40 INFO mapred.JobClient:   FileSystemCounters
> 13/01/16 14:41:40 INFO mapred.JobClient:     HDFS_BYTES_READ=248
> 13/01/16 14:41:40 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=60214
> 13/01/16 14:41:40 INFO mapred.JobClient:   File Input Format Counters
> 13/01/16 14:41:40 INFO mapred.JobClient:     Bytes Read=0
> 13/01/16 14:41:40 INFO mapred.JobClient:   Map-Reduce Framework
> 13/01/16 14:41:40 INFO mapred.JobClient:     Map input records=1036434
> 13/01/16 14:41:40 INFO mapred.JobClient:     Physical memory (bytes)
> snapshot=373760000
> 13/01/16 14:41:40 INFO mapred.JobClient:     Spilled Records=0
> 13/01/16 14:41:40 INFO mapred.JobClient:     CPU time spent (ms)=24410
> 13/01/16 14:41:40 INFO mapred.JobClient:     Total committed heap usage
> (bytes)=168394752
> 13/01/16 14:41:40 INFO mapred.JobClient:     Virtual memory (bytes)
> snapshot=2124627968
> 13/01/16 14:41:40 INFO mapred.JobClient:     Map output records=2462684
> 13/01/16 14:41:40 INFO mapred.JobClient:     SPLIT_RAW_BYTES=248
>
>
>
> When I kick off a job via a java client running on a different host, the
> job seems to run (I can see things being scanned and ingested) but I don't
> see anything via the jobtracker UI on the server.  Is that normal?  Or do I
> have something mis-configured?
>
>
>
> Here's how I'm starting things from the client:
>
>     @Override
>     public int run(String[] strings) throws Exception {
>         Job job = new Job(getConf(), getClass().getSimpleName());
>         job.setJarByClass(getClass());
>         job.setMapperClass(MyMapper.class);
>
>         job.setInputFormatClass(AccumuloRowInputFormat.class);
>
> AccumuloRowInputFormat.setZooKeeperInstance(job.getConfiguration(),
> instanceName, zookeepers);
>
>         AccumuloRowInputFormat.setInputInfo(job.getConfiguration(),
>                 username,
>                 password.getBytes(),
>                 "...",
>                 new Authorizations());
>
>         job.setNumReduceTasks(0);
>
>         job.setOutputFormatClass(AccumuloOutputFormat.class);
>         job.setOutputKeyClass(Key.class);
>         job.setOutputValueClass(Mutation.class);
>
>         boolean createTables = true;
>         String defaultTable = "...";
>         AccumuloOutputFormat.setOutputInfo(job.getConfiguration(),
>                 username,
>                 password.getBytes(),
>                 createTables,
>                 defaultTable);
>
>         AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(),
> instanceName, zookeepers);
>
>         job.waitForCompletion(true);
>
>         return job.isSuccessful() ? 0 : 1;
>     }
>
>     public static void main(String args[]) throws Exception {
>         int res = ToolRunner.run(CachedConfiguration.getInstance(), new
> ...(), args);
>         System.exit(res);
>     }
>
>
>
> Here's the output when I run it via the client application:
>
>
> 2013-01-16 13:55:57,645 [main-SendThread()] INFO  zookeeper.ClientCnxn  -
> Opening socket connection to server accumulo/10.1.10.160:2181
> 2013-01-16 13:55:57,660 [main-SendThread(accumulo:2181)] INFO
>  zookeeper.ClientCnxn  - Socket connection established to accumulo/
> 10.1.10.160:2181, initiating session
> 2013-01-16 13:55:57,671 [main-SendThread(accumulo:2181)] INFO
>  zookeeper.ClientCnxn  - Session establishment complete on server accumulo/
> 10.1.10.160:2181, sessionid = 0x13c449cfe010434, negotiated timeout =
> 30000
> 2013-01-16 13:55:58,379 [main] INFO  mapred.JobClient  - Running job:
> job_local_0001
> 2013-01-16 13:55:58,447 [Thread-16] INFO  mapred.Task  -  Using
> ResourceCalculatorPlugin : null
> 2013-01-16 13:55:59,383 [main] INFO  mapred.JobClient  -  map 0% reduce 0%
> 2013-01-16 13:56:04,458 [communication thread] INFO  mapred.LocalJobRunner
>  -
> 2013-01-16 13:56:07,459 [communication thread] INFO  mapred.LocalJobRunner
>  -
> 2013-01-16 13:56:10,461 [communication thread] INFO  mapred.LocalJobRunner
>  -
> 2013-01-16 13:56:13,462 [communication thread] INFO  mapred.LocalJobRunner
>  -
> 2013-01-16 13:56:16,463 [communication thread] INFO  mapred.LocalJobRunner
>  -
> 2013-01-16 13:56:19,465 [communication thread] INFO  mapred.LocalJobRunner
>  -
> 2013-01-16 13:56:21,783 [Thread-16] INFO  mapred.Task  -
> Task:attempt_local_0001_m_000000_0 is done. And is in the process of
> commiting
> 2013-01-16 13:56:21,783 [Thread-16] INFO  mapred.LocalJobRunner  -
> 2013-01-16 13:56:21,784 [Thread-16] INFO  mapred.Task  - Task
> 'attempt_local_0001_m_000000_0' done.
> 2013-01-16 13:56:21,786 [Thread-16] INFO  mapred.Task  -  Using
> ResourceCalculatorPlugin : null
> 2013-01-16 13:56:22,423 [main] INFO  mapred.JobClient  -  map 100% reduce
> 0%
> 2013-01-16 13:56:27,788 [communication thread] INFO  mapred.LocalJobRunner
>  -
> 2013-01-16 13:56:28,440 [main] INFO  mapred.JobClient  -  map 50% reduce 0%
> 2013-01-16 13:56:30,790 [communication thread] INFO  mapred.LocalJobRunner
>  -
> 2013-01-16 13:56:33,791 [communication thread] INFO  mapred.LocalJobRunner
>  -
> 2013-01-16 13:56:36,792 [communication thread] INFO  mapred.LocalJobRunner
>  -
> 2013-01-16 13:56:39,793 [communication thread] INFO  mapred.LocalJobRunner
>  -
> 2013-01-16 13:56:42,794 [communication thread] INFO  mapred.LocalJobRunner
>  -
> 2013-01-16 13:56:45,779 [Thread-16] INFO  mapred.Task  -
> Task:attempt_local_0001_m_000001_0 is done. And is in the process of
> commiting
> 2013-01-16 13:56:45,780 [Thread-16] INFO  mapred.LocalJobRunner  -
> 2013-01-16 13:56:45,781 [Thread-16] INFO  mapred.Task  - Task
> 'attempt_local_0001_m_000001_0' done.
> 2013-01-16 13:56:45,782 [Thread-16] WARN  mapred.FileOutputCommitter  -
> Output path is null in cleanup
> 2013-01-16 13:56:46,462 [main] INFO  mapred.JobClient  -  map 100% reduce
> 0%
> 2013-01-16 13:56:46,462 [main] INFO  mapred.JobClient  - Job complete:
> job_local_0001
> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  - Counters: 7
> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
> FileSystemCounters
> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
> FILE_BYTES_READ=1257
> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
> FILE_BYTES_WRITTEN=106136
> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -   Map-Reduce
> Framework
> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Map input
> records=1036434
> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Spilled
> Records=0
> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Total
> committed heap usage (bytes)=259915776
> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Map output
> records=2462684
> 2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
> SPLIT_RAW_BYTES=240
>
>
>
> On Wed, Jan 16, 2013 at 11:20 AM, John Vines <vi...@apache.org> wrote:
>
>> The code examples we have scripted simply do the necessary setup for
>> creating a mapreduce job and kicking it off. If you check out the code for
>> them in
>> src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/
>> you can see what we're doing in Java to kick off jobs.
>>
>> The short explanation is, just like any other MapReduce job, we're
>> setting up a Job, configuring the AccumuloInput and/or OutputFormats, and
>> sending them off like any other MapReduce job.
>>
>> John
>>
>>
>> On Wed, Jan 16, 2013 at 12:11 PM, Mike Hugo <mi...@piragua.com> wrote:
>>
>>> I'm writing a client program that uses the BatchWriter and BatchScanner
>>> for inserting and querying data, but occasionally it also needs to be able
>>> to kick of a Map/Reduce job on a remote accumulo cluster.  The Map/Reduce
>>> examples that ship with Accumulo look like they are meant to be invoked via
>>> the command line.  Does anyone have an example of how to kick something off
>>> via a java client running on a separate server?  Any best practices to
>>> share?
>>> Thanks,
>>> Mike
>>>
>>
>>
>

Re: Programtically invoking a Map/Reduce job

Posted by Mike Hugo <mi...@piragua.com>.
Cool, thanks for the feedback John, the examples have been helpful in
getting up and running!

Perhaps I'm not doing something quite right.  When I jar up my jobs and
deploy the jar to the server and run it via the tool.sh command on the
cluster, I see the job running in the jobtracker (servername:50030) and it
runs as I would expect.

13/01/16 14:39:53 INFO mapred.JobClient: Running job: job_201301161326_0006
13/01/16 14:39:54 INFO mapred.JobClient:  map 0% reduce 0%
13/01/16 14:41:29 INFO mapred.JobClient:  map 50% reduce 0%
13/01/16 14:41:35 INFO mapred.JobClient:  map 100% reduce 0%
13/01/16 14:41:40 INFO mapred.JobClient: Job complete: job_201301161326_0006
13/01/16 14:41:40 INFO mapred.JobClient: Counters: 18
13/01/16 14:41:40 INFO mapred.JobClient:   Job Counters
13/01/16 14:41:40 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=180309
13/01/16 14:41:40 INFO mapred.JobClient:     Total time spent by all
reduces waiting after reserving slots (ms)=0
13/01/16 14:41:40 INFO mapred.JobClient:     Total time spent by all maps
waiting after reserving slots (ms)=0
13/01/16 14:41:40 INFO mapred.JobClient:     Rack-local map tasks=2
13/01/16 14:41:40 INFO mapred.JobClient:     Launched map tasks=2
13/01/16 14:41:40 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
13/01/16 14:41:40 INFO mapred.JobClient:   File Output Format Counters
13/01/16 14:41:40 INFO mapred.JobClient:     Bytes Written=0
13/01/16 14:41:40 INFO mapred.JobClient:   FileSystemCounters
13/01/16 14:41:40 INFO mapred.JobClient:     HDFS_BYTES_READ=248
13/01/16 14:41:40 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=60214
13/01/16 14:41:40 INFO mapred.JobClient:   File Input Format Counters
13/01/16 14:41:40 INFO mapred.JobClient:     Bytes Read=0
13/01/16 14:41:40 INFO mapred.JobClient:   Map-Reduce Framework
13/01/16 14:41:40 INFO mapred.JobClient:     Map input records=1036434
13/01/16 14:41:40 INFO mapred.JobClient:     Physical memory (bytes)
snapshot=373760000
13/01/16 14:41:40 INFO mapred.JobClient:     Spilled Records=0
13/01/16 14:41:40 INFO mapred.JobClient:     CPU time spent (ms)=24410
13/01/16 14:41:40 INFO mapred.JobClient:     Total committed heap usage
(bytes)=168394752
13/01/16 14:41:40 INFO mapred.JobClient:     Virtual memory (bytes)
snapshot=2124627968
13/01/16 14:41:40 INFO mapred.JobClient:     Map output records=2462684
13/01/16 14:41:40 INFO mapred.JobClient:     SPLIT_RAW_BYTES=248



When I kick off a job via a java client running on a different host, the
job seems to run (I can see things being scanned and ingested) but I don't
see anything via the jobtracker UI on the server.  Is that normal?  Or do I
have something mis-configured?



Here's how I'm starting things from the client:

    @Override
    public int run(String[] strings) throws Exception {
        Job job = new Job(getConf(), getClass().getSimpleName());
        job.setJarByClass(getClass());
        job.setMapperClass(MyMapper.class);

        job.setInputFormatClass(AccumuloRowInputFormat.class);
        AccumuloRowInputFormat.setZooKeeperInstance(job.getConfiguration(),
instanceName, zookeepers);

        AccumuloRowInputFormat.setInputInfo(job.getConfiguration(),
                username,
                password.getBytes(),
                "...",
                new Authorizations());

        job.setNumReduceTasks(0);

        job.setOutputFormatClass(AccumuloOutputFormat.class);
        job.setOutputKeyClass(Key.class);
        job.setOutputValueClass(Mutation.class);

        boolean createTables = true;
        String defaultTable = "...";
        AccumuloOutputFormat.setOutputInfo(job.getConfiguration(),
                username,
                password.getBytes(),
                createTables,
                defaultTable);

        AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(),
instanceName, zookeepers);

        job.waitForCompletion(true);

        return job.isSuccessful() ? 0 : 1;
    }

    public static void main(String args[]) throws Exception {
        int res = ToolRunner.run(CachedConfiguration.getInstance(), new
...(), args);
        System.exit(res);
    }



Here's the output when I run it via the client application:


2013-01-16 13:55:57,645 [main-SendThread()] INFO  zookeeper.ClientCnxn  -
Opening socket connection to server accumulo/10.1.10.160:2181
2013-01-16 13:55:57,660 [main-SendThread(accumulo:2181)] INFO
 zookeeper.ClientCnxn  - Socket connection established to accumulo/
10.1.10.160:2181, initiating session
2013-01-16 13:55:57,671 [main-SendThread(accumulo:2181)] INFO
 zookeeper.ClientCnxn  - Session establishment complete on server accumulo/
10.1.10.160:2181, sessionid = 0x13c449cfe010434, negotiated timeout = 30000
2013-01-16 13:55:58,379 [main] INFO  mapred.JobClient  - Running job:
job_local_0001
2013-01-16 13:55:58,447 [Thread-16] INFO  mapred.Task  -  Using
ResourceCalculatorPlugin : null
2013-01-16 13:55:59,383 [main] INFO  mapred.JobClient  -  map 0% reduce 0%
2013-01-16 13:56:04,458 [communication thread] INFO  mapred.LocalJobRunner
 -
2013-01-16 13:56:07,459 [communication thread] INFO  mapred.LocalJobRunner
 -
2013-01-16 13:56:10,461 [communication thread] INFO  mapred.LocalJobRunner
 -
2013-01-16 13:56:13,462 [communication thread] INFO  mapred.LocalJobRunner
 -
2013-01-16 13:56:16,463 [communication thread] INFO  mapred.LocalJobRunner
 -
2013-01-16 13:56:19,465 [communication thread] INFO  mapred.LocalJobRunner
 -
2013-01-16 13:56:21,783 [Thread-16] INFO  mapred.Task  -
Task:attempt_local_0001_m_000000_0 is done. And is in the process of
commiting
2013-01-16 13:56:21,783 [Thread-16] INFO  mapred.LocalJobRunner  -
2013-01-16 13:56:21,784 [Thread-16] INFO  mapred.Task  - Task
'attempt_local_0001_m_000000_0' done.
2013-01-16 13:56:21,786 [Thread-16] INFO  mapred.Task  -  Using
ResourceCalculatorPlugin : null
2013-01-16 13:56:22,423 [main] INFO  mapred.JobClient  -  map 100% reduce 0%
2013-01-16 13:56:27,788 [communication thread] INFO  mapred.LocalJobRunner
 -
2013-01-16 13:56:28,440 [main] INFO  mapred.JobClient  -  map 50% reduce 0%
2013-01-16 13:56:30,790 [communication thread] INFO  mapred.LocalJobRunner
 -
2013-01-16 13:56:33,791 [communication thread] INFO  mapred.LocalJobRunner
 -
2013-01-16 13:56:36,792 [communication thread] INFO  mapred.LocalJobRunner
 -
2013-01-16 13:56:39,793 [communication thread] INFO  mapred.LocalJobRunner
 -
2013-01-16 13:56:42,794 [communication thread] INFO  mapred.LocalJobRunner
 -
2013-01-16 13:56:45,779 [Thread-16] INFO  mapred.Task  -
Task:attempt_local_0001_m_000001_0 is done. And is in the process of
commiting
2013-01-16 13:56:45,780 [Thread-16] INFO  mapred.LocalJobRunner  -
2013-01-16 13:56:45,781 [Thread-16] INFO  mapred.Task  - Task
'attempt_local_0001_m_000001_0' done.
2013-01-16 13:56:45,782 [Thread-16] WARN  mapred.FileOutputCommitter  -
Output path is null in cleanup
2013-01-16 13:56:46,462 [main] INFO  mapred.JobClient  -  map 100% reduce 0%
2013-01-16 13:56:46,462 [main] INFO  mapred.JobClient  - Job complete:
job_local_0001
2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  - Counters: 7
2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
FileSystemCounters
2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
FILE_BYTES_READ=1257
2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
FILE_BYTES_WRITTEN=106136
2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -   Map-Reduce
Framework
2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Map input
records=1036434
2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Spilled
Records=0
2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Total
committed heap usage (bytes)=259915776
2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -     Map output
records=2462684
2013-01-16 13:56:46,463 [main] INFO  mapred.JobClient  -
SPLIT_RAW_BYTES=240



On Wed, Jan 16, 2013 at 11:20 AM, John Vines <vi...@apache.org> wrote:

> The code examples we have scripted simply do the necessary setup for
> creating a mapreduce job and kicking it off. If you check out the code for
> them in
> src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/
> you can see what we're doing in Java to kick off jobs.
>
> The short explanation is, just like any other MapReduce job, we're setting
> up a Job, configuring the AccumuloInput and/or OutputFormats, and sending
> them off like any other MapReduce job.
>
> John
>
>
> On Wed, Jan 16, 2013 at 12:11 PM, Mike Hugo <mi...@piragua.com> wrote:
>
>> I'm writing a client program that uses the BatchWriter and BatchScanner
>> for inserting and querying data, but occasionally it also needs to be able
>> to kick of a Map/Reduce job on a remote accumulo cluster.  The Map/Reduce
>> examples that ship with Accumulo look like they are meant to be invoked via
>> the command line.  Does anyone have an example of how to kick something off
>> via a java client running on a separate server?  Any best practices to
>> share?
>> Thanks,
>> Mike
>>
>
>

Re: Programtically invoking a Map/Reduce job

Posted by John Vines <vi...@apache.org>.
The code examples we have scripted simply do the necessary setup for
creating a mapreduce job and kicking it off. If you check out the code for
them in
src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/
you can see what we're doing in Java to kick off jobs.

The short explanation is, just like any other MapReduce job, we're setting
up a Job, configuring the AccumuloInput and/or OutputFormats, and sending
them off like any other MapReduce job.

John


On Wed, Jan 16, 2013 at 12:11 PM, Mike Hugo <mi...@piragua.com> wrote:

> I'm writing a client program that uses the BatchWriter and BatchScanner
> for inserting and querying data, but occasionally it also needs to be able
> to kick of a Map/Reduce job on a remote accumulo cluster.  The Map/Reduce
> examples that ship with Accumulo look like they are meant to be invoked via
> the command line.  Does anyone have an example of how to kick something off
> via a java client running on a separate server?  Any best practices to
> share?
> Thanks,
> Mike
>