You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/15 16:54:55 UTC

flink git commit: [FLINK-7490] [table] Use correct classloader to compile generated code that calls UDAGGs.

Repository: flink
Updated Branches:
  refs/heads/master 54eeccfe1 -> 59df4b75f


[FLINK-7490] [table] Use correct classloader to compile generated code that calls UDAGGs.

This closes #5018.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59df4b75
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59df4b75
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59df4b75

Branch: refs/heads/master
Commit: 59df4b75fa9c6754c89ea1922e05cfdb22e761da
Parents: 54eeccf
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Nov 15 10:12:55 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Nov 15 17:49:41 2017 +0100

----------------------------------------------------------------------
 .../operators/translation/RichCombineToGroupCombineWrapper.java    | 1 +
 .../flink/table/runtime/aggregate/AggregateAggFunction.scala       | 2 +-
 .../apache/flink/table/runtime/aggregate/DataSetAggFunction.scala  | 2 +-
 .../flink/table/runtime/aggregate/DataSetFinalAggFunction.scala    | 2 +-
 .../flink/table/runtime/aggregate/DataSetPreAggFunction.scala      | 2 +-
 .../aggregate/DataSetSessionWindowAggReduceGroupFunction.scala     | 2 +-
 .../aggregate/DataSetSessionWindowAggregatePreProcessor.scala      | 2 +-
 .../aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala   | 2 +-
 .../aggregate/DataSetSlideWindowAggReduceCombineFunction.scala     | 2 +-
 .../aggregate/DataSetSlideWindowAggReduceGroupFunction.scala       | 2 +-
 .../aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala | 2 +-
 .../DataSetTumbleTimeWindowAggReduceCombineFunction.scala          | 2 +-
 .../aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala  | 2 +-
 13 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
index 3f6463a..9cbda50 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
@@ -42,6 +42,7 @@ public class RichCombineToGroupCombineWrapper<IN, OUT, F extends RichGroupReduce
 
 	@Override
 	public void open(Configuration config) throws Exception {
+		wrappedFunction.setRuntimeContext(getRuntimeContext());
 		wrappedFunction.open(config);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index 330386b..4dbaeea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -70,7 +70,7 @@ class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction)
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      Thread.currentThread().getContextClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
index bc0c163..ced1450 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
@@ -46,7 +46,7 @@ class DataSetAggFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
index 3b3be70..f2eb3d9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
@@ -47,7 +47,7 @@ class DataSetFinalAggFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
index fc3366b..744a739 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
@@ -48,7 +48,7 @@ class DataSetPreAggFunction(genAggregations: GeneratedAggregationsFunction)
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
index 372fc0d..0d54de6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
@@ -72,7 +72,7 @@ class DataSetSessionWindowAggReduceGroupFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
index 666bfee..35e8142 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
@@ -59,7 +59,7 @@ class DataSetSessionWindowAggregatePreProcessor(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
index 3af7969..f2987a7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
@@ -68,7 +68,7 @@ class DataSetSlideTimeWindowAggReduceGroupFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
index 2da838f..6a9d631 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
@@ -67,7 +67,7 @@ class DataSetSlideWindowAggReduceCombineFunction(
     LOG.debug(s"Compiling AggregateHelper: $genPreAggregations.name \n\n " +
       s"Code:\n$genPreAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genPreAggregations.name,
       genPreAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
index 474a09b..f96e841 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
@@ -63,7 +63,7 @@ class DataSetSlideWindowAggReduceGroupFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
index 22fe389..f4d347a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
@@ -50,7 +50,7 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
index 9eeab33..a3a72ae 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
@@ -66,7 +66,7 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction(
     LOG.debug(s"Compiling AggregateHelper: $genPreAggregations.name \n\n " +
       s"Code:\n$genPreAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genPreAggregations.name,
       genPreAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
index 4e92148..14e89ad 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
@@ -62,7 +62,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")