You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2022/10/06 23:16:12 UTC

[hadoop] branch branch-3.3 updated: MAPREDUCE-7370. Parallelize MultipleOutputs#close call (#4248). Contributed by Ashutosh Gupta.

This is an automated email from the ASF dual-hosted git repository.

cnauroth pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 725cd907125 MAPREDUCE-7370. Parallelize MultipleOutputs#close call (#4248). Contributed by Ashutosh Gupta.
725cd907125 is described below

commit 725cd9071254d1099fa5eb9b03acb203568896f7
Author: Ashutosh Gupta <as...@st.niituniversity.in>
AuthorDate: Thu Oct 6 23:23:05 2022 +0100

    MAPREDUCE-7370. Parallelize MultipleOutputs#close call (#4248). Contributed by Ashutosh Gupta.
    
    Reviewed-by: Akira Ajisaka <aa...@apache.org>
    Signed-off-by: Chris Nauroth <cn...@apache.org>
    (cherry picked from commit 062c50db6bebfabae38aec9e17be2483a11c3f7f)
---
 .../apache/hadoop/mapred/lib/MultipleOutputs.java  | 73 ++++++++++++++++++++--
 .../java/org/apache/hadoop/mapreduce/MRConfig.java |  2 +
 .../mapreduce/lib/output/MultipleOutputs.java      | 56 ++++++++++++++++-
 .../hadoop/mapred/lib/TestMultipleOutputs.java     | 20 ++++++
 .../lib/output/TestMRMultipleOutputs.java          | 23 ++++++-
 5 files changed, 165 insertions(+), 9 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java
index 3ef6601fbfe..a214420df80 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java
@@ -17,15 +17,39 @@
  */
 package org.apache.hadoop.mapred.lib;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.util.Progressable;
 
-import java.io.IOException;
-import java.util.*;
-
 /**
  * The MultipleOutputs class simplifies writing to additional outputs other
  * than the job default output via the <code>OutputCollector</code> passed to
@@ -132,6 +156,7 @@ public class MultipleOutputs {
    * Counters group used by the counters of MultipleOutputs.
    */
   private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
+  private static final Logger LOG = LoggerFactory.getLogger(MultipleOutputs.class);
 
   /**
    * Checks if a named output is alreadyDefined or not.
@@ -381,6 +406,11 @@ public class MultipleOutputs {
   private Map<String, RecordWriter> recordWriters;
   private boolean countersEnabled;
 
+  @VisibleForTesting
+  synchronized void setRecordWriters(Map<String, RecordWriter> recordWriters) {
+    this.recordWriters = recordWriters;
+  }
+
   /**
    * Creates and initializes multiple named outputs support, it should be
    * instantiated in the Mapper/Reducer configure method.
@@ -528,8 +558,41 @@ public class MultipleOutputs {
    *                             could not be closed properly.
    */
   public void close() throws IOException {
+    int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT,
+        MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT);
+    AtomicBoolean encounteredException = new AtomicBoolean(false);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close")
+        .setUncaughtExceptionHandler(((t, e) -> {
+          LOG.error("Thread " + t + " failed unexpectedly", e);
+          encounteredException.set(true);
+        })).build();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
+
+    List<Callable<Object>> callableList = new ArrayList<>(recordWriters.size());
+
     for (RecordWriter writer : recordWriters.values()) {
-      writer.close(null);
+      callableList.add(() -> {
+        try {
+          writer.close(null);
+        } catch (IOException e) {
+          LOG.error("Error while closing MultipleOutput file", e);
+          encounteredException.set(true);
+        }
+        return null;
+      });
+    }
+    try {
+      executorService.invokeAll(callableList);
+    } catch (InterruptedException e) {
+      LOG.warn("Closing is Interrupted");
+      Thread.currentThread().interrupt();
+    } finally {
+      executorService.shutdown();
+    }
+
+    if (encounteredException.get()) {
+      throw new IOException(
+          "One or more threads encountered exception during close. See prior errors.");
     }
   }
 
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
index b4d91491e1c..8671eb30b99 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
@@ -131,5 +131,7 @@ public interface MRConfig {
   String MASTER_WEBAPP_UI_ACTIONS_ENABLED =
       "mapreduce.webapp.ui-actions.enabled";
   boolean DEFAULT_MASTER_WEBAPP_UI_ACTIONS_ENABLED = true;
+  String MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT = "mapreduce.multiple-outputs-close-threads";
+  int DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT = 10;
 }
   
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
index a3a0e76ab81..05a50303723 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
@@ -19,16 +19,24 @@ package org.apache.hadoop.mapreduce.lib.output;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.Reducer.Context;
-import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The MultipleOutputs class simplifies writing output data 
@@ -193,6 +201,8 @@ public class MultipleOutputs<KEYOUT, VALUEOUT> {
    * Counters group used by the counters of MultipleOutputs.
    */
   private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
+  private static final Logger LOG =
+      LoggerFactory.getLogger(org.apache.hadoop.mapred.lib.MultipleOutputs.class);
 
   /**
    * Cache for the taskContexts
@@ -347,6 +357,11 @@ public class MultipleOutputs<KEYOUT, VALUEOUT> {
     return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
   }
 
+  @VisibleForTesting
+  synchronized void setRecordWriters(Map<String, RecordWriter<?, ?>> recordWriters) {
+    this.recordWriters = recordWriters;
+  }
+
   /**
    * Wraps RecordWriter to increment counters. 
    */
@@ -570,8 +585,43 @@ public class MultipleOutputs<KEYOUT, VALUEOUT> {
    */
   @SuppressWarnings("unchecked")
   public void close() throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT,
+        MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT);
+    AtomicBoolean encounteredException = new AtomicBoolean(false);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close")
+        .setUncaughtExceptionHandler(((t, e) -> {
+          LOG.error("Thread " + t + " failed unexpectedly", e);
+          encounteredException.set(true);
+        })).build();
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
+
+    List<Callable<Object>> callableList = new ArrayList<>(recordWriters.size());
+
     for (RecordWriter writer : recordWriters.values()) {
-      writer.close(context);
+      callableList.add(() -> {
+        try {
+          writer.close(context);
+        } catch (IOException e) {
+          LOG.error("Error while closing MultipleOutput file", e);
+          encounteredException.set(true);
+        }
+        return null;
+      });
+    }
+    try {
+      executorService.invokeAll(callableList);
+    } catch (InterruptedException e) {
+      LOG.warn("Closing is Interrupted");
+      Thread.currentThread().interrupt();
+    } finally {
+      executorService.shutdown();
+    }
+
+    if (encounteredException.get()) {
+      throw new IOException(
+          "One or more threads encountered exception during close. See prior errors.");
     }
   }
 }
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
index f3e58930eac..8829a093b13 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RunningJob;
@@ -46,11 +47,16 @@ import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.Arrays;
 import java.util.Iterator;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestMultipleOutputs extends HadoopTestCase {
 
@@ -70,6 +76,19 @@ public class TestMultipleOutputs extends HadoopTestCase {
     _testMOWithJavaSerialization(true);
   }
 
+  @SuppressWarnings("unchecked")
+  @Test(expected = IOException.class)
+  public void testParallelCloseIOException() throws IOException {
+    RecordWriter writer = mock(RecordWriter.class);
+    Map<String, RecordWriter> recordWriters = mock(Map.class);
+    when(recordWriters.values()).thenReturn(Arrays.asList(writer, writer));
+    doThrow(new IOException("test IO exception")).when(writer).close(null);
+    JobConf conf = createJobConf();
+    MultipleOutputs mos = new MultipleOutputs(conf);
+    mos.setRecordWriters(recordWriters);
+    mos.close();
+  }
+
   private static final Path ROOT_DIR = new Path("testing/mo");
   private static final Path IN_DIR = new Path(ROOT_DIR, "input");
   private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
@@ -307,6 +326,7 @@ public class TestMultipleOutputs extends HadoopTestCase {
 
   }
 
+
   @SuppressWarnings({"unchecked"})
   public static class MOMap implements Mapper<LongWritable, Text, LongWritable,
     Text> {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
index babd20e66c4..717163ce243 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.Reducer;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,10 +41,15 @@ import org.junit.Test;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestMRMultipleOutputs extends HadoopTestCase {
 
@@ -62,6 +69,20 @@ public class TestMRMultipleOutputs extends HadoopTestCase {
     _testMOWithJavaSerialization(true);
   }
 
+  @SuppressWarnings("unchecked")
+  @Test(expected = IOException.class)
+  public void testParallelCloseIOException() throws IOException, InterruptedException {
+    RecordWriter writer = mock(RecordWriter.class);
+    Map recordWriters = mock(Map.class);
+    when(recordWriters.values()).thenReturn(Arrays.asList(writer, writer));
+    Mapper.Context taskInputOutputContext = mock(Mapper.Context.class);
+    when(taskInputOutputContext.getConfiguration()).thenReturn(createJobConf());
+    doThrow(new IOException("test IO exception")).when(writer).close(taskInputOutputContext);
+    MultipleOutputs<Long, String> mos = new MultipleOutputs<Long, String>(taskInputOutputContext);
+    mos.setRecordWriters(recordWriters);
+    mos.close();
+  }
+
   private static String localPathRoot = 
     System.getProperty("test.build.data", "/tmp");
   private static final Path ROOT_DIR = new Path(localPathRoot, "testing/mo");
@@ -85,7 +106,7 @@ public class TestMRMultipleOutputs extends HadoopTestCase {
     fs.delete(ROOT_DIR, true);
     super.tearDown();
   }
-  
+
   protected void _testMOWithJavaSerialization(boolean withCounters) throws Exception {
     String input = "a\nb\nc\nd\ne\nc\nd\ne";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org