You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/15 16:54:55 UTC
flink git commit: [FLINK-7490] [table] Use correct classloader to
compile generated code that calls UDAGGs.
Repository: flink
Updated Branches:
refs/heads/master 54eeccfe1 -> 59df4b75f
[FLINK-7490] [table] Use correct classloader to compile generated code that calls UDAGGs.
This closes #5018.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59df4b75
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59df4b75
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59df4b75
Branch: refs/heads/master
Commit: 59df4b75fa9c6754c89ea1922e05cfdb22e761da
Parents: 54eeccf
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Nov 15 10:12:55 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Nov 15 17:49:41 2017 +0100
----------------------------------------------------------------------
.../operators/translation/RichCombineToGroupCombineWrapper.java | 1 +
.../flink/table/runtime/aggregate/AggregateAggFunction.scala | 2 +-
.../apache/flink/table/runtime/aggregate/DataSetAggFunction.scala | 2 +-
.../flink/table/runtime/aggregate/DataSetFinalAggFunction.scala | 2 +-
.../flink/table/runtime/aggregate/DataSetPreAggFunction.scala | 2 +-
.../aggregate/DataSetSessionWindowAggReduceGroupFunction.scala | 2 +-
.../aggregate/DataSetSessionWindowAggregatePreProcessor.scala | 2 +-
.../aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala | 2 +-
.../aggregate/DataSetSlideWindowAggReduceCombineFunction.scala | 2 +-
.../aggregate/DataSetSlideWindowAggReduceGroupFunction.scala | 2 +-
.../aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala | 2 +-
.../DataSetTumbleTimeWindowAggReduceCombineFunction.scala | 2 +-
.../aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala | 2 +-
13 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
index 3f6463a..9cbda50 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
@@ -42,6 +42,7 @@ public class RichCombineToGroupCombineWrapper<IN, OUT, F extends RichGroupReduce
@Override
public void open(Configuration config) throws Exception {
+ wrappedFunction.setRuntimeContext(getRuntimeContext());
wrappedFunction.open(config);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index 330386b..4dbaeea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -70,7 +70,7 @@ class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction)
LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
s"Code:\n$genAggregations.code")
val clazz = compile(
- getClass.getClassLoader,
+ Thread.currentThread().getContextClassLoader,
genAggregations.name,
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
index bc0c163..ced1450 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
@@ -46,7 +46,7 @@ class DataSetAggFunction(
LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
s"Code:\n$genAggregations.code")
val clazz = compile(
- getClass.getClassLoader,
+ getRuntimeContext.getUserCodeClassLoader,
genAggregations.name,
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
index 3b3be70..f2eb3d9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
@@ -47,7 +47,7 @@ class DataSetFinalAggFunction(
LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
s"Code:\n$genAggregations.code")
val clazz = compile(
- getClass.getClassLoader,
+ getRuntimeContext.getUserCodeClassLoader,
genAggregations.name,
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
index fc3366b..744a739 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
@@ -48,7 +48,7 @@ class DataSetPreAggFunction(genAggregations: GeneratedAggregationsFunction)
LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
s"Code:\n$genAggregations.code")
val clazz = compile(
- getClass.getClassLoader,
+ getRuntimeContext.getUserCodeClassLoader,
genAggregations.name,
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
index 372fc0d..0d54de6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
@@ -72,7 +72,7 @@ class DataSetSessionWindowAggReduceGroupFunction(
LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
s"Code:\n$genAggregations.code")
val clazz = compile(
- getClass.getClassLoader,
+ getRuntimeContext.getUserCodeClassLoader,
genAggregations.name,
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
index 666bfee..35e8142 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
@@ -59,7 +59,7 @@ class DataSetSessionWindowAggregatePreProcessor(
LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
s"Code:\n$genAggregations.code")
val clazz = compile(
- getClass.getClassLoader,
+ getRuntimeContext.getUserCodeClassLoader,
genAggregations.name,
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
index 3af7969..f2987a7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
@@ -68,7 +68,7 @@ class DataSetSlideTimeWindowAggReduceGroupFunction(
LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
s"Code:\n$genAggregations.code")
val clazz = compile(
- getClass.getClassLoader,
+ getRuntimeContext.getUserCodeClassLoader,
genAggregations.name,
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
index 2da838f..6a9d631 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
@@ -67,7 +67,7 @@ class DataSetSlideWindowAggReduceCombineFunction(
LOG.debug(s"Compiling AggregateHelper: $genPreAggregations.name \n\n " +
s"Code:\n$genPreAggregations.code")
val clazz = compile(
- getClass.getClassLoader,
+ getRuntimeContext.getUserCodeClassLoader,
genPreAggregations.name,
genPreAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
index 474a09b..f96e841 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
@@ -63,7 +63,7 @@ class DataSetSlideWindowAggReduceGroupFunction(
LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
s"Code:\n$genAggregations.code")
val clazz = compile(
- getClass.getClassLoader,
+ getRuntimeContext.getUserCodeClassLoader,
genAggregations.name,
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
index 22fe389..f4d347a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
@@ -50,7 +50,7 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
s"Code:\n$genAggregations.code")
val clazz = compile(
- getClass.getClassLoader,
+ getRuntimeContext.getUserCodeClassLoader,
genAggregations.name,
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
index 9eeab33..a3a72ae 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
@@ -66,7 +66,7 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction(
LOG.debug(s"Compiling AggregateHelper: $genPreAggregations.name \n\n " +
s"Code:\n$genPreAggregations.code")
val clazz = compile(
- getClass.getClassLoader,
+ getRuntimeContext.getUserCodeClassLoader,
genPreAggregations.name,
genPreAggregations.code)
LOG.debug("Instantiating AggregateHelper.")
http://git-wip-us.apache.org/repos/asf/flink/blob/59df4b75/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
index 4e92148..14e89ad 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
@@ -62,7 +62,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
s"Code:\n$genAggregations.code")
val clazz = compile(
- getClass.getClassLoader,
+ getRuntimeContext.getUserCodeClassLoader,
genAggregations.name,
genAggregations.code)
LOG.debug("Instantiating AggregateHelper.")