You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/10 13:55:53 UTC
[3/3] flink git commit: [FLINK-2617] [hadoop-compat] Added static
mutexes for configure, open, close HadoopFormats
[FLINK-2617] [hadoop-compat] Added static mutexes for configure, open, close HadoopFormats
This closes #1111
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8754352f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8754352f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8754352f
Branch: refs/heads/master
Commit: 8754352ff53cd1ab621d6c97f7e5baac369b5c28
Parents: 16fb4e9
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Sep 9 14:32:21 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Sep 10 12:35:48 2015 +0200
----------------------------------------------------------------------
.../hadoop/mapred/HadoopInputFormatBase.java | 46 ++++--
.../hadoop/mapred/HadoopOutputFormatBase.java | 101 +++++++------
.../hadoop/mapreduce/HadoopInputFormatBase.java | 55 ++++---
.../mapreduce/HadoopOutputFormatBase.java | 142 +++++++++++--------
4 files changed, 212 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8754352f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
index 932b7de..356f7ad 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
@@ -59,6 +59,14 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
+ // Mutexes to avoid concurrent operations on Hadoop InputFormats.
+ // Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
+ // In contrast, Flink parallelizes using Threads, so multiple Hadoop InputFormat instances
+ // might be used in the same JVM.
+ private static final Object OPEN_MUTEX = new Object();
+ private static final Object CONFIGURE_MUTEX = new Object();
+ private static final Object CLOSE_MUTEX = new Object();
+
private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
protected Class<K> keyClass;
protected Class<V> valueClass;
@@ -91,12 +99,15 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void configure(Configuration parameters) {
- // configure MR InputFormat if necessary
- if(this.mapredInputFormat instanceof Configurable) {
- ((Configurable)this.mapredInputFormat).setConf(this.jobConf);
- }
- else if(this.mapredInputFormat instanceof JobConfigurable) {
- ((JobConfigurable)this.mapredInputFormat).configure(this.jobConf);
+
+ // enforce sequential configuration() calls
+ synchronized (CONFIGURE_MUTEX) {
+ // configure MR InputFormat if necessary
+ if (this.mapredInputFormat instanceof Configurable) {
+ ((Configurable) this.mapredInputFormat).setConf(this.jobConf);
+ } else if (this.mapredInputFormat instanceof JobConfigurable) {
+ ((JobConfigurable) this.mapredInputFormat).configure(this.jobConf);
+ }
}
}
@@ -148,13 +159,18 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void open(HadoopInputSplit split) throws IOException {
- this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
- if (this.recordReader instanceof Configurable) {
- ((Configurable) this.recordReader).setConf(jobConf);
+
+ // enforce sequential open() calls
+ synchronized (OPEN_MUTEX) {
+
+ this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
+ if (this.recordReader instanceof Configurable) {
+ ((Configurable) this.recordReader).setConf(jobConf);
+ }
+ key = this.recordReader.createKey();
+ value = this.recordReader.createValue();
+ this.fetched = false;
}
- key = this.recordReader.createKey();
- value = this.recordReader.createValue();
- this.fetched = false;
}
@Override
@@ -172,7 +188,11 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void close() throws IOException {
- this.recordReader.close();
+
+ // enforce sequential close() calls
+ synchronized (CLOSE_MUTEX) {
+ this.recordReader.close();
+ }
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8754352f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
index 456003f..40214f2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
@@ -54,6 +54,14 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
private static final long serialVersionUID = 1L;
+ // Mutexes to avoid concurrent operations on Hadoop OutputFormats.
+ // Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
+ // In contrast, Flink parallelizes using Threads, so multiple Hadoop OutputFormat instances
+ // might be used in the same JVM.
+ private static final Object OPEN_MUTEX = new Object();
+ private static final Object CONFIGURE_MUTEX = new Object();
+ private static final Object CLOSE_MUTEX = new Object();
+
private JobConf jobConf;
private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;
protected transient RecordWriter<K,V> recordWriter;
@@ -77,12 +85,15 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
@Override
public void configure(Configuration parameters) {
- // configure MR OutputFormat if necessary
- if(this.mapredOutputFormat instanceof Configurable) {
- ((Configurable)this.mapredOutputFormat).setConf(this.jobConf);
- }
- else if(this.mapredOutputFormat instanceof JobConfigurable) {
- ((JobConfigurable)this.mapredOutputFormat).configure(this.jobConf);
+
+ // enforce sequential configure() calls
+ synchronized (CONFIGURE_MUTEX) {
+ // configure MR OutputFormat if necessary
+ if (this.mapredOutputFormat instanceof Configurable) {
+ ((Configurable) this.mapredOutputFormat).setConf(this.jobConf);
+ } else if (this.mapredOutputFormat instanceof JobConfigurable) {
+ ((JobConfigurable) this.mapredOutputFormat).configure(this.jobConf);
+ }
}
}
@@ -94,39 +105,43 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
- if (Integer.toString(taskNumber + 1).length() > 6) {
- throw new IOException("Task id too large.");
- }
-
- TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
- + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
- + Integer.toString(taskNumber + 1)
- + "_0");
- this.jobConf.set("mapred.task.id", taskAttemptID.toString());
- this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
- // for hadoop 2.2
- this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
- this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
-
- try {
- this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
- } catch (Exception e) {
- throw new RuntimeException(e);
+ // enforce sequential open() calls
+ synchronized (OPEN_MUTEX) {
+ if (Integer.toString(taskNumber + 1).length() > 6) {
+ throw new IOException("Task id too large.");
+ }
+
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ + Integer.toString(taskNumber + 1)
+ + "_0");
+
+ this.jobConf.set("mapred.task.id", taskAttemptID.toString());
+ this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
+ // for hadoop 2.2
+ this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+ this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
+
+ try {
+ this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ this.outputCommitter = this.jobConf.getOutputCommitter();
+
+ JobContext jobContext;
+ try {
+ jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ this.outputCommitter.setupJob(jobContext);
+
+ this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
-
- this.outputCommitter = this.jobConf.getOutputCommitter();
-
- JobContext jobContext;
- try {
- jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- this.outputCommitter.setupJob(jobContext);
-
- this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
/**
@@ -135,10 +150,14 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
*/
@Override
public void close() throws IOException {
- this.recordWriter.close(new HadoopDummyReporter());
-
- if (this.outputCommitter.needsTaskCommit(this.context)) {
- this.outputCommitter.commitTask(this.context);
+
+ // enforce sequential close() calls
+ synchronized (CLOSE_MUTEX) {
+ this.recordWriter.close(new HadoopDummyReporter());
+
+ if (this.outputCommitter.needsTaskCommit(this.context)) {
+ this.outputCommitter.commitTask(this.context);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8754352f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
index 09435e2..e9b23f7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
@@ -59,6 +59,14 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
+ // Mutexes to avoid concurrent operations on Hadoop InputFormats.
+ // Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
+ // In contrast, Flink parallelizes using Threads, so multiple Hadoop InputFormat instances
+ // might be used in the same JVM.
+ private static final Object OPEN_MUTEX = new Object();
+ private static final Object CONFIGURE_MUTEX = new Object();
+ private static final Object CLOSE_MUTEX = new Object();
+
// NOTE: this class is using a custom serialization logic, without a defaultWriteObject() method.
// Hence, all fields here are "transient".
private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
@@ -89,8 +97,12 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void configure(Configuration parameters) {
- if (mapreduceInputFormat instanceof Configurable) {
- ((Configurable) mapreduceInputFormat).setConf(configuration);
+
+ // enforce sequential configuration() calls
+ synchronized (CONFIGURE_MUTEX) {
+ if (mapreduceInputFormat instanceof Configurable) {
+ ((Configurable) mapreduceInputFormat).setConf(configuration);
+ }
}
}
@@ -169,21 +181,26 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void open(HadoopInputSplit split) throws IOException {
- TaskAttemptContext context;
- try {
- context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- try {
- this.recordReader = this.mapreduceInputFormat
- .createRecordReader(split.getHadoopInputSplit(), context);
- this.recordReader.initialize(split.getHadoopInputSplit(), context);
- } catch (InterruptedException e) {
- throw new IOException("Could not create RecordReader.", e);
- } finally {
- this.fetched = false;
+ // enforce sequential open() calls
+ synchronized (OPEN_MUTEX) {
+
+ TaskAttemptContext context;
+ try {
+ context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ this.recordReader = this.mapreduceInputFormat
+ .createRecordReader(split.getHadoopInputSplit(), context);
+ this.recordReader.initialize(split.getHadoopInputSplit(), context);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not create RecordReader.", e);
+ } finally {
+ this.fetched = false;
+ }
}
}
@@ -207,7 +224,11 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void close() throws IOException {
- this.recordReader.close();
+
+ // enforce sequential close() calls
+ synchronized (CLOSE_MUTEX) {
+ this.recordReader.close();
+ }
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8754352f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
index 72c105b..dc475e8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
@@ -49,6 +49,14 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
private static final long serialVersionUID = 1L;
+ // Mutexes to avoid concurrent operations on Hadoop OutputFormats.
+ // Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
+ // In contrast, Flink parallelizes using Threads, so multiple Hadoop OutputFormat instances
+ // might be used in the same JVM.
+ private static final Object OPEN_MUTEX = new Object();
+ private static final Object CONFIGURE_MUTEX = new Object();
+ private static final Object CLOSE_MUTEX = new Object();
+
private org.apache.hadoop.conf.Configuration configuration;
private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
protected transient RecordWriter<K,V> recordWriter;
@@ -73,8 +81,12 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
@Override
public void configure(Configuration parameters) {
- if(this.mapreduceOutputFormat instanceof Configurable){
- ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
+
+ // enforce sequential configure() calls
+ synchronized (CONFIGURE_MUTEX) {
+ if (this.mapreduceOutputFormat instanceof Configurable) {
+ ((Configurable) this.mapreduceOutputFormat).setConf(this.configuration);
+ }
}
}
@@ -86,49 +98,53 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
- if (Integer.toString(taskNumber + 1).length() > 6) {
- throw new IOException("Task id too large.");
- }
-
- this.taskNumber = taskNumber+1;
- // for hadoop 2.2
- this.configuration.set("mapreduce.output.basename", "tmp");
+ // enforce sequential open() calls
+ synchronized (OPEN_MUTEX) {
+ if (Integer.toString(taskNumber + 1).length() > 6) {
+ throw new IOException("Task id too large.");
+ }
- TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
- + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
- + Integer.toString(taskNumber + 1)
- + "_0");
+ this.taskNumber = taskNumber + 1;
- this.configuration.set("mapred.task.id", taskAttemptID.toString());
- this.configuration.setInt("mapred.task.partition", taskNumber + 1);
- // for hadoop 2.2
- this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
- this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
-
- try {
- this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
- this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
- this.outputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ // for hadoop 2.2
+ this.configuration.set("mapreduce.output.basename", "tmp");
- this.context.getCredentials().addAll(this.credentials);
- Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
- if(currentUserCreds != null) {
- this.context.getCredentials().addAll(currentUserCreds);
- }
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ + Integer.toString(taskNumber + 1)
+ + "_0");
- // compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
- if(outputCommitter instanceof FileOutputCommitter) {
- this.configuration.set("mapreduce.task.output.dir", ((FileOutputCommitter)this.outputCommitter).getWorkPath().toString());
- }
-
- try {
- this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
- } catch (InterruptedException e) {
- throw new IOException("Could not create RecordWriter.", e);
+ this.configuration.set("mapred.task.id", taskAttemptID.toString());
+ this.configuration.setInt("mapred.task.partition", taskNumber + 1);
+ // for hadoop 2.2
+ this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+ this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
+
+ try {
+ this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+ this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
+ this.outputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ this.context.getCredentials().addAll(this.credentials);
+ Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
+ if (currentUserCreds != null) {
+ this.context.getCredentials().addAll(currentUserCreds);
+ }
+
+ // compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
+ if (outputCommitter instanceof FileOutputCommitter) {
+ this.configuration.set("mapreduce.task.output.dir", ((FileOutputCommitter) this.outputCommitter).getWorkPath().toString());
+ }
+
+ try {
+ this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not create RecordWriter.", e);
+ }
}
}
@@ -138,27 +154,31 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
*/
@Override
public void close() throws IOException {
- try {
- this.recordWriter.close(this.context);
- } catch (InterruptedException e) {
- throw new IOException("Could not close RecordReader.", e);
- }
-
- if (this.outputCommitter.needsTaskCommit(this.context)) {
- this.outputCommitter.commitTask(this.context);
- }
-
- Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
-
- // rename tmp-file to final name
- FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
-
- String taskNumberStr = Integer.toString(this.taskNumber);
- String tmpFileTemplate = "tmp-r-00000";
- String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
-
- if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
- fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
+
+ // enforce sequential close() calls
+ synchronized (CLOSE_MUTEX) {
+ try {
+ this.recordWriter.close(this.context);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not close RecordReader.", e);
+ }
+
+ if (this.outputCommitter.needsTaskCommit(this.context)) {
+ this.outputCommitter.commitTask(this.context);
+ }
+
+ Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
+
+ // rename tmp-file to final name
+ FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
+
+ String taskNumberStr = Integer.toString(this.taskNumber);
+ String tmpFileTemplate = "tmp-r-00000";
+ String tmpFile = tmpFileTemplate.substring(0, 11 - taskNumberStr.length()) + taskNumberStr;
+
+ if (fs.exists(new Path(outputPath.toString() + "/" + tmpFile))) {
+ fs.rename(new Path(outputPath.toString() + "/" + tmpFile), new Path(outputPath.toString() + "/" + taskNumberStr));
+ }
}
}