You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/07/30 15:06:51 UTC
[1/2] flink git commit: [cascading] load user classloader when
configuring InputFormat
Repository: flink
Updated Branches:
refs/heads/master 73af89114 -> ca6dd4275
[cascading] load user classloader when configuring InputFormat
This closes #950.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca6dd427
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca6dd427
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca6dd427
Branch: refs/heads/master
Commit: ca6dd42759f180779f9dfef63535f297fcb2eaf0
Parents: 466f9bf
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Jul 29 14:51:14 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Jul 30 15:06:06 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/runtime/jobgraph/InputFormatVertex.java | 8 +++++++-
.../org/apache/flink/runtime/operators/DataSinkTask.java | 8 +++++++-
.../org/apache/flink/runtime/operators/DataSourceTask.java | 8 +++++++-
3 files changed, 21 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ca6dd427/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
index 011850c..781108c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
@@ -71,14 +71,20 @@ public class InputFormatVertex extends JobVertex {
catch (Throwable t) {
throw new Exception("Instantiating the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
-
+
+ Thread thread = Thread.currentThread();
+ ClassLoader original = thread.getContextClassLoader();
// configure
try {
+ thread.setContextClassLoader(loader);
inputFormat.configure(cfg.getStubParameters());
}
catch (Throwable t) {
throw new Exception("Configuring the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
}
+ finally {
+ thread.setContextClassLoader(original);
+ }
setInputSplitSource(inputFormat);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ca6dd427/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index b3130a1..d291b5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -323,15 +323,21 @@ public class DataSinkTask<IT> extends AbstractInvokable {
catch (ClassCastException ccex) {
throw new RuntimeException("The stub class is not a proper subclass of " + OutputFormat.class.getName(), ccex);
}
-
+
+ Thread thread = Thread.currentThread();
+ ClassLoader original = thread.getContextClassLoader();
// configure the stub. catch exceptions here extra, to report them as originating from the user code
try {
+ thread.setContextClassLoader(userCodeClassLoader);
this.format.configure(this.config.getStubParameters());
}
catch (Throwable t) {
throw new RuntimeException("The user defined 'configure()' method in the Output Format caused an error: "
+ t.getMessage(), t);
}
+ finally {
+ thread.setContextClassLoader(original);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ca6dd427/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 3f1c642..df81408 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -266,13 +266,19 @@ public class DataSourceTask<OT> extends AbstractInvokable {
ccex);
}
- // configure the stub. catch exceptions here extra, to report them as originating from the user code
+ Thread thread = Thread.currentThread();
+ ClassLoader original = thread.getContextClassLoader();
+ // configure the stub. catch exceptions here extra, to report them as originating from the user code
try {
+ thread.setContextClassLoader(userCodeClassLoader);
this.format.configure(this.config.getStubParameters());
}
catch (Throwable t) {
throw new RuntimeException("The user defined 'configure()' method caused an error: " + t.getMessage(), t);
}
+ finally {
+ thread.setContextClassLoader(original);
+ }
// get the factory for the type serializer
this.serializerFactory = this.config.getOutputSerializer(userCodeClassLoader);
[2/2] flink git commit: [cascading] add getJobConf() to
HadoopInputSplit
Posted by mx...@apache.org.
[cascading] add getJobConf() to HadoopInputSplit
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/466f9bf9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/466f9bf9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/466f9bf9
Branch: refs/heads/master
Commit: 466f9bf9c98b05bf35ca29af1b04343b7c9ea5f6
Parents: 73af891
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Jul 17 00:31:09 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Jul 30 15:06:06 2015 +0200
----------------------------------------------------------------------
.../flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/466f9bf9/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
index d949dfd..15f94b2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
@@ -80,6 +80,10 @@ public class HadoopInputSplit extends LocatableInputSplit {
return hadoopInputSplit;
}
+ public JobConf getJobConf() {
+ return this.jobConf;
+ }
+
// ------------------------------------------------------------------------
// Serialization
// ------------------------------------------------------------------------