You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/29 12:45:50 UTC
[flink] branch master updated: [FLINK-13458][table]
ThreadLocalCache clashes for Blink planner
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 14afeea [FLINK-13458][table] ThreadLocalCache clashes for Blink planner
14afeea is described below
commit 14afeea06dcd9cb90dfaf471156b34a14380038b
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 29 09:32:48 2019 +0200
[FLINK-13458][table] ThreadLocalCache clashes for Blink planner
This closes #9257.
---
.../flink/table/utils}/ThreadLocalCache.java | 10 +++--
.../runtime/functions/DateTimeFunctions.scala | 1 +
.../table/runtime/functions/ThreadLocalCache.scala | 49 ----------------------
.../table/runtime/functions/SqlDateTimeUtils.java | 1 +
.../table/runtime/functions/SqlFunctionUtils.java | 1 +
5 files changed, 10 insertions(+), 52 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java
similarity index 90%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java
index 39de4df..ca47ab0 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java
@@ -16,15 +16,19 @@
* limitations under the License.
*/
-package org.apache.flink.table.runtime.functions;
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
import java.util.LinkedHashMap;
import java.util.Map;
/**
- * Provides a ThreadLocal cache with a maximum cache size per thread.
- * Values must not be null.
+ * Provides a thread local cache with a maximum cache size per thread.
+ *
+ * <p>Note: Values must not be null.
*/
+@Internal
public abstract class ThreadLocalCache<K, V> {
private static final int DEFAULT_CACHE_SIZE = 64;
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
index d69a5c9..0bde983 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
@@ -17,6 +17,7 @@
*/
package org.apache.flink.table.runtime.functions
+import org.apache.flink.table.utils.ThreadLocalCache
import org.joda.time.format.DateTimeFormatter
import org.joda.time.format.DateTimeFormatterBuilder
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala
deleted file mode 100644
index b3a8d7a..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.functions
-
-import java.util.{LinkedHashMap => JLinkedHashMap}
-import java.util.{Map => JMap}
-
-/**
- * Provides a ThreadLocal cache with a maximum cache size per thread.
- * Values must not be null.
- */
-abstract class ThreadLocalCache[K, V](val maxSizePerThread: Int) {
- private val cache = new ThreadLocal[BoundedMap[K, V]]
-
- protected def getNewInstance(key: K): V
-
- def get(key: K): V = {
- var m = cache.get
- if (m == null) {
- m = new BoundedMap(maxSizePerThread)
- cache.set(m)
- }
- var v = m.get(key)
- if (v == null) {
- v = getNewInstance(key)
- m.put(key, v)
- }
- v
- }
-}
-
-private class BoundedMap[K, V](val maxSize: Int) extends JLinkedHashMap[K,V] {
- override protected def removeEldestEntry(eldest: JMap.Entry[K, V]): Boolean = size > maxSize
-}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
index 118dca7..3cd1246 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.dataformat.Decimal;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.utils.ThreadLocalCache;
import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.avatica.util.TimeUnit;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index 2e00fa1..6ae9b8b 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.dataformat.BinaryStringUtil;
import org.apache.flink.table.dataformat.Decimal;
import org.apache.flink.table.runtime.util.JsonUtils;
import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.table.utils.ThreadLocalCache;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;