You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@avro.apache.org by "Stan Rosenberg (JIRA)" <ji...@apache.org> on 2018/02/02 21:04:00 UTC
[jira] [Comment Edited] (AVRO-2138)
org.apache.avro.mapreduce.AvroMultipleOutputs.write copies Configuration on
every invocation of write
[ https://issues.apache.org/jira/browse/AVRO-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345988#comment-16345988 ]
Stan Rosenberg edited comment on AVRO-2138 at 2/2/18 9:03 PM:
--------------------------------------------------------------
{code:java}
import com.google.common.base.Preconditions;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
/**
* Extending avro's implementation in order to fix un-necessary copying which happens on _every_ invocation of
* write(key, value, keySchema, valSchema, baseOutputPath)
*
* see https://issues.apache.org/jira/browse/AVRO-2138
*/
public class PatchedAvroMultipleOutputs extends org.apache.avro.mapreduce.AvroMultipleOutputs {
private Map<String, RecordWriter<?, ?>> recordWriters = new HashMap<>();
private TaskAttemptContext taskContext;
public PatchedAvroMultipleOutputs(TaskInputOutputContext<?, ?, ?, ?> context) {
super(context);
try {
taskContext = createTaskAttemptContext(new Job(context.getConfiguration()).getConfiguration(), context.getTaskAttemptID());
} catch (IOException ex) {
throw new IllegalStateException("Unable to instantiate PatchedAvroMultipleOutputs", ex);
}
}
@Override
public void write(Object key, Object value, Schema keySchema, Schema valSchema, String baseOutputPath) throws IOException, InterruptedException {
Preconditions.checkArgument(!baseOutputPath.equals("part"), "output name cannot be 'part'");
getRecordWriter(taskContext, baseOutputPath).write(key, value);
}
private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext, String baseFileName)
throws IOException, InterruptedException {
// look for record-writer in the cache
RecordWriter writer = recordWriters.get(baseFileName);
// If not in cache, create a new one
if (writer == null) {
// get the record writer from context output format
//FileOutputFormat.setOutputName(taskContext, baseFileName);
taskContext.getConfiguration().set("avro.mo.config.namedOutput", baseFileName);
try {
writer = ((OutputFormat) ReflectionUtils.newInstance(
taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
.getRecordWriter(taskContext);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
// add the record-writer to the cache
recordWriters.put(baseFileName, writer);
}
return writer;
}
private TaskAttemptContext createTaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
// Use reflection since the context types changed incompatibly between 1.0
// and 2.0.
try {
Class<?> c = getTaskAttemptContextClass();
Constructor<?> cons = c.getConstructor(Configuration.class,
TaskAttemptID.class);
return (TaskAttemptContext) cons.newInstance(conf, taskId);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private Class<?> getTaskAttemptContextClass() {
try {
return Class.forName(
"org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
} catch (Exception e) {
try {
return Class.forName(
"org.apache.hadoop.mapreduce.TaskAttemptContext");
} catch (Exception ex) {
throw new IllegalStateException(ex);
}
}
}
@Override
public void close() throws IOException, InterruptedException {
for (RecordWriter writer : recordWriters.values()) {
writer.close(taskContext);
}
super.close();
}
}
{code}
was (Author: srosenberg):
{code:java}
import com.google.common.base.Preconditions;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
/**
* Extending avro's implementation in order to fix un-necessary copying which happens on _every_ invocation of
* write(key, value, keySchema, valSchema, baseOutputPath)
*
* see https://issues.apache.org/jira/browse/AVRO-2138
*/
public class PatchedAvroMultipleOutputs extends org.apache.avro.mapreduce.AvroMultipleOutputs {
private Map<String, RecordWriter<?, ?>> recordWriters = new HashMap<>();
private TaskAttemptContext taskContext;
public PatchedAvroMultipleOutputs(TaskInputOutputContext<?, ?, ?, ?> context) {
super(context);
try {
taskContext = createTaskAttemptContext(new Job(context.getConfiguration()).getConfiguration(), context.getTaskAttemptID());
} catch (IOException ex) {
throw new IllegalStateException("Unable to instantiate PatchedAvroMultipleOutputs", ex);
}
}
@Override
public void write(Object key, Object value, Schema keySchema, Schema valSchema, String baseOutputPath) throws IOException, InterruptedException {
Preconditions.checkArgument(!baseOutputPath.equals("part"), "output name cannot be 'part'");
getRecordWriter(taskContext, baseOutputPath).write(key, value);
}
private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext, String baseFileName)
throws IOException, InterruptedException {
// look for record-writer in the cache
RecordWriter writer = recordWriters.get(baseFileName);
// If not in cache, create a new one
if (writer == null) {
// get the record writer from context output format
//FileOutputFormat.setOutputName(taskContext, baseFileName);
taskContext.getConfiguration().set("avro.mo.config.namedOutput", baseFileName);
try {
writer = ((OutputFormat) ReflectionUtils.newInstance(
taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
.getRecordWriter(taskContext);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
// add the record-writer to the cache
recordWriters.put(baseFileName, writer);
}
return writer;
}
private TaskAttemptContext createTaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
// Use reflection since the context types changed incompatibly between 1.0
// and 2.0.
try {
Class<?> c = getTaskAttemptContextClass();
Constructor<?> cons = c.getConstructor(Configuration.class,
TaskAttemptID.class);
return (TaskAttemptContext) cons.newInstance(conf, taskId);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private Class<?> getTaskAttemptContextClass() {
try {
return Class.forName(
"org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
} catch (Exception e) {
try {
return Class.forName(
"org.apache.hadoop.mapreduce.TaskAttemptContext");
} catch (Exception ex) {
throw new IllegalStateException(ex);
}
}
}
}
{code}
> org.apache.avro.mapreduce.AvroMultipleOutputs.write copies Configuration on every invocation of write
> -----------------------------------------------------------------------------------------------------
>
> Key: AVRO-2138
> URL: https://issues.apache.org/jira/browse/AVRO-2138
> Project: Avro
> Issue Type: Bug
> Affects Versions: 1.8.2
> Reporter: Stan Rosenberg
> Priority: Major
>
> While profiling a spark job using AvroMultipleOutputs, I noticed that a great deal of time is wasted by copying (hadoop) Configuration. Indeed this happens on _every_ invocation of {{write}}: [https://github.com/apache/avro/blob/master/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java#L437]
> After patching, I am seeing a speed-up of 2x and above in running time of the same job.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)