You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Allen Wittenauer (JIRA)" <ji...@apache.org> on 2014/07/31 02:21:40 UTC

[jira] [Resolved] (MAPREDUCE-2123) Multiple threads per JVM

     [ https://issues.apache.org/jira/browse/MAPREDUCE-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Allen Wittenauer resolved MAPREDUCE-2123.
-----------------------------------------

    Resolution: Not a Problem

> Multiple threads per JVM
> ------------------------
>
>                 Key: MAPREDUCE-2123
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2123
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>            Reporter: Randy Wilson
>
> I have a process that standardizes name and place strings, and requires access to java objects that require a lot of RAM (800MB).  Hadoop (via Amazon elastic mapreduce) was running out of memory, because it was firing up one JVM per task per slave.  Each JVM needed 1.5GB, and 6 of those blew out memory.
> In this case, we don't need 6 different JVMs running--we only need one, but with multiple threads.  I tried using a MultithreadedMapper, but it doesn't have a thread-safe "run()" method: it makes 3 calls to the input source to read one "line", which doesn't work if multiple threads are doing that.  So I had to override the run() method.  I ended up having to do so much work to override the run() method that it was simpler to skip using the MultithreadedMapper at all.  Instead, I took my original mapper and just overrode the run() method there directly.  I fired up n threads, each of which called a method that had a synchronized(mutex) around the part of the process that made the three calls to an input source to get the next line to operate on.  Then, outside of the synchronized block, it called the map() method, which made use of the large, shared (singleton) standardization object.
> All of this made me wonder why hadoop fires up multiple JVMs per slave in the first place--that is a lot of overhead to use per thread.   I've also been warned that doing continual reuse of JVMs instead of restarting one per task will use up more memory.  That seems like it should only be true if hadoop (or our mapper) is leaking memory.  If that's the case, let's get it fixed.
> My guess is that since hadoop can run tasks in languages other than Java--and since other languages may have less overhead per process--that firing up a JVM per task (or per thread) simplifies hadoop.  But the multithreaded solution we did was very general-purpose.  It seems like it ought to be pretty much the default solution in java, and that a "...map.threads" property should be all that is required to fire up that many threads to help with each task, rather than have to jump through the hoops we had to.
> Below is the implementation that seems to be working:
> In the main class:
>     Configuration config = getConf();
>     config.set("num_threads_per_jvm", Integer.toString(numThreads));
>     Job job = new Job(config, "Standardize stuff");
> In the Mapper class:
>   public void run(final Context context) throws IOException, InterruptedException {
>     int numThreads = Integer.parseInt(context.getConfiguration().get("num_threads_per_jvm");
>     setup(context); // setup and cleanup just once, rather than once per thread
>     List<MapRunner> mapRunners = new ArrayList<MapRunner>();
>     for (int i = 0; i < numThreads; i++) {
>       MapRunner mapRunner = new MapRunner(context, i);
>       mapRunners.add(mapRunner);
>       mapRunner.start();
>     }
>      // Wait for all the threads to complete
>     for (MapRunner mapRunner : mapRunners) {
>       mapRunner.join();
>     }
>     cleanup(context);
>   }
>   private class MapRunner extends Thread {
>     final Context context;
>     private MapRunner(Context context) {
>       this.context = context;
>     }
>     @Override
>     public void run() {
>       boolean gotValue = true;
>       do {
>         try {
>           Text key = null;
>           Text value = null;
>           synchronized(contextMutex) {
>             gotValue = context.nextKeyValue();
>             if (gotValue) {
>               key = context.getCurrentKey();
>               value = context.getCurrentValue();
>             }
>           }
>           if (gotValue) {
>             map(key, value, context);
>           }
>         } catch (Exception e) {
>           throw new RuntimeException(e);
>         }
>       } while (gotValue);
>     }
>   }



--
This message was sent by Atlassian JIRA
(v6.2#6252)