You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@avro.apache.org by "Stan Rosenberg (JIRA)" <ji...@apache.org> on 2018/02/02 21:04:00 UTC

[jira] [Comment Edited] (AVRO-2138) org.apache.avro.mapreduce.AvroMultipleOutputs.write copies Configuration on every invocation of write

    [ https://issues.apache.org/jira/browse/AVRO-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345988#comment-16345988 ] 

Stan Rosenberg edited comment on AVRO-2138 at 2/2/18 9:03 PM:
--------------------------------------------------------------

{code:java}
import com.google.common.base.Preconditions;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;

/**
 * Extending avro's implementation in order to fix un-necessary copying which happens on _every_ invocation of
 *      write(key, value, keySchema, valSchema, baseOutputPath)
 *
 * see https://issues.apache.org/jira/browse/AVRO-2138
 */
public class PatchedAvroMultipleOutputs extends org.apache.avro.mapreduce.AvroMultipleOutputs {
    private Map<String, RecordWriter<?, ?>> recordWriters = new HashMap<>();
    private TaskAttemptContext taskContext;

    public PatchedAvroMultipleOutputs(TaskInputOutputContext<?, ?, ?, ?> context) {
        super(context);
        try {
            taskContext = createTaskAttemptContext(new Job(context.getConfiguration()).getConfiguration(), context.getTaskAttemptID());
        } catch (IOException ex) {
            throw new IllegalStateException("Unable to instantiate PatchedAvroMultipleOutputs", ex);
        }
    }

    @Override
    public void write(Object key, Object value, Schema keySchema, Schema valSchema, String baseOutputPath) throws IOException, InterruptedException {
        Preconditions.checkArgument(!baseOutputPath.equals("part"), "output name cannot be 'part'");

        getRecordWriter(taskContext, baseOutputPath).write(key, value);
    }

    private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext, String baseFileName)
            throws IOException, InterruptedException {

        // look for record-writer in the cache
        RecordWriter writer = recordWriters.get(baseFileName);

        // If not in cache, create a new one
        if (writer == null) {
            // get the record writer from context output format
            //FileOutputFormat.setOutputName(taskContext, baseFileName);
            taskContext.getConfiguration().set("avro.mo.config.namedOutput", baseFileName);
            try {
                writer = ((OutputFormat) ReflectionUtils.newInstance(
                        taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
                        .getRecordWriter(taskContext);
            } catch (ClassNotFoundException e) {
                throw new IOException(e);
            }
            // add the record-writer to the cache
            recordWriters.put(baseFileName, writer);
        }
        return writer;
    }

    private TaskAttemptContext createTaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
        // Use reflection since the context types changed incompatibly between 1.0
        // and 2.0.
        try {
            Class<?> c = getTaskAttemptContextClass();
            Constructor<?> cons = c.getConstructor(Configuration.class,
                    TaskAttemptID.class);
            return (TaskAttemptContext) cons.newInstance(conf, taskId);
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    private Class<?> getTaskAttemptContextClass() {
        try {
            return Class.forName(
                    "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
        } catch (Exception e) {
            try {
                return Class.forName(
                        "org.apache.hadoop.mapreduce.TaskAttemptContext");
            } catch (Exception ex) {
                throw new IllegalStateException(ex);
            }
        }
    }

    @Override
    public void close() throws IOException, InterruptedException {
       for (RecordWriter writer : recordWriters.values()) {
          writer.close(taskContext);
       }
       super.close();
    }
}
{code}


was (Author: srosenberg):
{code:java}
import com.google.common.base.Preconditions;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;

/**
 * Extending avro's implementation in order to fix un-necessary copying which happens on _every_ invocation of
 *      write(key, value, keySchema, valSchema, baseOutputPath)
 *
 * see https://issues.apache.org/jira/browse/AVRO-2138
 */
public class PatchedAvroMultipleOutputs extends org.apache.avro.mapreduce.AvroMultipleOutputs {
    private Map<String, RecordWriter<?, ?>> recordWriters = new HashMap<>();
    private TaskAttemptContext taskContext;

    public PatchedAvroMultipleOutputs(TaskInputOutputContext<?, ?, ?, ?> context) {
        super(context);
        try {
            taskContext = createTaskAttemptContext(new Job(context.getConfiguration()).getConfiguration(), context.getTaskAttemptID());
        } catch (IOException ex) {
            throw new IllegalStateException("Unable to instantiate PatchedAvroMultipleOutputs", ex);
        }
    }

    @Override
    public void write(Object key, Object value, Schema keySchema, Schema valSchema, String baseOutputPath) throws IOException, InterruptedException {
        Preconditions.checkArgument(!baseOutputPath.equals("part"), "output name cannot be 'part'");

        getRecordWriter(taskContext, baseOutputPath).write(key, value);
    }

    private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext, String baseFileName)
            throws IOException, InterruptedException {

        // look for record-writer in the cache
        RecordWriter writer = recordWriters.get(baseFileName);

        // If not in cache, create a new one
        if (writer == null) {
            // get the record writer from context output format
            //FileOutputFormat.setOutputName(taskContext, baseFileName);
            taskContext.getConfiguration().set("avro.mo.config.namedOutput", baseFileName);
            try {
                writer = ((OutputFormat) ReflectionUtils.newInstance(
                        taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
                        .getRecordWriter(taskContext);
            } catch (ClassNotFoundException e) {
                throw new IOException(e);
            }
            // add the record-writer to the cache
            recordWriters.put(baseFileName, writer);
        }
        return writer;
    }

    private TaskAttemptContext createTaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
        // Use reflection since the context types changed incompatibly between 1.0
        // and 2.0.
        try {
            Class<?> c = getTaskAttemptContextClass();
            Constructor<?> cons = c.getConstructor(Configuration.class,
                    TaskAttemptID.class);
            return (TaskAttemptContext) cons.newInstance(conf, taskId);
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    private Class<?> getTaskAttemptContextClass() {
        try {
            return Class.forName(
                    "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
        } catch (Exception e) {
            try {
                return Class.forName(
                        "org.apache.hadoop.mapreduce.TaskAttemptContext");
            } catch (Exception ex) {
                throw new IllegalStateException(ex);
            }
        }
    }
}
{code}

> org.apache.avro.mapreduce.AvroMultipleOutputs.write copies Configuration on every invocation of write
> -----------------------------------------------------------------------------------------------------
>
>                 Key: AVRO-2138
>                 URL: https://issues.apache.org/jira/browse/AVRO-2138
>             Project: Avro
>          Issue Type: Bug
>    Affects Versions: 1.8.2
>            Reporter: Stan Rosenberg
>            Priority: Major
>
> While profiling a spark job using AvroMultipleOutputs, I noticed that a great deal of time is wasted by copying (hadoop) Configuration.  Indeed this happens on _every_ invocation of {{write}}: [https://github.com/apache/avro/blob/master/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java#L437]
> After patching, I am seeing a speed-up of 2x and above in running time of the same job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)