You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/07/30 15:06:51 UTC

[1/2] flink git commit: [cascading] load user classloader when configuring InputFormat

Repository: flink
Updated Branches:
  refs/heads/master 73af89114 -> ca6dd4275


[cascading] load user classloader when configuring InputFormat

This closes #950.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca6dd427
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca6dd427
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca6dd427

Branch: refs/heads/master
Commit: ca6dd42759f180779f9dfef63535f297fcb2eaf0
Parents: 466f9bf
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Jul 29 14:51:14 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Jul 30 15:06:06 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/jobgraph/InputFormatVertex.java | 8 +++++++-
 .../org/apache/flink/runtime/operators/DataSinkTask.java     | 8 +++++++-
 .../org/apache/flink/runtime/operators/DataSourceTask.java   | 8 +++++++-
 3 files changed, 21 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ca6dd427/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
index 011850c..781108c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
@@ -71,14 +71,20 @@ public class InputFormatVertex extends JobVertex {
 		catch (Throwable t) {
 			throw new Exception("Instantiating the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
 		}
-		
+
+		Thread thread = Thread.currentThread();
+		ClassLoader original = thread.getContextClassLoader();
 		// configure
 		try {
+			thread.setContextClassLoader(loader);
 			inputFormat.configure(cfg.getStubParameters());
 		}
 		catch (Throwable t) {
 			throw new Exception("Configuring the InputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
 		}
+		finally {
+			thread.setContextClassLoader(original);
+		}
 		
 		setInputSplitSource(inputFormat);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ca6dd427/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index b3130a1..d291b5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -323,15 +323,21 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 		catch (ClassCastException ccex) {
 			throw new RuntimeException("The stub class is not a proper subclass of " + OutputFormat.class.getName(), ccex);
 		}
-		
+
+		Thread thread = Thread.currentThread();
+		ClassLoader original = thread.getContextClassLoader();
 		// configure the stub. catch exceptions here extra, to report them as originating from the user code 
 		try {
+			thread.setContextClassLoader(userCodeClassLoader);
 			this.format.configure(this.config.getStubParameters());
 		}
 		catch (Throwable t) {
 			throw new RuntimeException("The user defined 'configure()' method in the Output Format caused an error: " 
 				+ t.getMessage(), t);
 		}
+		finally {
+			thread.setContextClassLoader(original);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ca6dd427/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 3f1c642..df81408 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -266,13 +266,19 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 					ccex);
 		}
 
-		// configure the stub. catch exceptions here extra, to report them as originating from the user code 
+		Thread thread = Thread.currentThread();
+		ClassLoader original = thread.getContextClassLoader();
+		// configure the stub. catch exceptions here extra, to report them as originating from the user code
 		try {
+			thread.setContextClassLoader(userCodeClassLoader);
 			this.format.configure(this.config.getStubParameters());
 		}
 		catch (Throwable t) {
 			throw new RuntimeException("The user defined 'configure()' method caused an error: " + t.getMessage(), t);
 		}
+		finally {
+			thread.setContextClassLoader(original);
+		}
 
 		// get the factory for the type serializer
 		this.serializerFactory = this.config.getOutputSerializer(userCodeClassLoader);


[2/2] flink git commit: [cascading] add getJobConf() to HadoopInputSplit

Posted by mx...@apache.org.
[cascading] add getJobConf() to HadoopInputSplit


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/466f9bf9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/466f9bf9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/466f9bf9

Branch: refs/heads/master
Commit: 466f9bf9c98b05bf35ca29af1b04343b7c9ea5f6
Parents: 73af891
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Jul 17 00:31:09 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Jul 30 15:06:06 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java   | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/466f9bf9/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
index d949dfd..15f94b2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
@@ -80,6 +80,10 @@ public class HadoopInputSplit extends LocatableInputSplit {
 		return hadoopInputSplit;
 	}
 
+	public JobConf getJobConf() {
+		return this.jobConf;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Serialization
 	// ------------------------------------------------------------------------