You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/29 12:45:50 UTC

[flink] branch master updated: [FLINK-13458][table] ThreadLocalCache clashes for Blink planner

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 14afeea  [FLINK-13458][table] ThreadLocalCache clashes for Blink planner
14afeea is described below

commit 14afeea06dcd9cb90dfaf471156b34a14380038b
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 29 09:32:48 2019 +0200

    [FLINK-13458][table] ThreadLocalCache clashes for Blink planner
    
    This closes #9257.
---
 .../flink/table/utils}/ThreadLocalCache.java       | 10 +++--
 .../runtime/functions/DateTimeFunctions.scala      |  1 +
 .../table/runtime/functions/ThreadLocalCache.scala | 49 ----------------------
 .../table/runtime/functions/SqlDateTimeUtils.java  |  1 +
 .../table/runtime/functions/SqlFunctionUtils.java  |  1 +
 5 files changed, 10 insertions(+), 52 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java
similarity index 90%
rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java
index 39de4df..ca47ab0 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java
@@ -16,15 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime.functions;
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
- * Provides a ThreadLocal cache with a maximum cache size per thread.
- * Values must not be null.
+ * Provides a thread local cache with a maximum cache size per thread.
+ *
+ * <p>Note: Values must not be null.
  */
+@Internal
 public abstract class ThreadLocalCache<K, V> {
 
 	private static final int DEFAULT_CACHE_SIZE = 64;
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
index d69a5c9..0bde983 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.runtime.functions
 
+import org.apache.flink.table.utils.ThreadLocalCache
 import org.joda.time.format.DateTimeFormatter
 import org.joda.time.format.DateTimeFormatterBuilder
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala
deleted file mode 100644
index b3a8d7a..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.functions
-
-import java.util.{LinkedHashMap => JLinkedHashMap}
-import java.util.{Map => JMap}
-
-/**
-  * Provides a ThreadLocal cache with a maximum cache size per thread.
-  * Values must not be null.
-  */
-abstract class ThreadLocalCache[K, V](val maxSizePerThread: Int) {
-  private val cache = new ThreadLocal[BoundedMap[K, V]]
-
-  protected def getNewInstance(key: K): V
-
-  def get(key: K): V = {
-    var m = cache.get
-    if (m == null) {
-      m = new BoundedMap(maxSizePerThread)
-      cache.set(m)
-    }
-    var v = m.get(key)
-    if (v == null) {
-      v = getNewInstance(key)
-      m.put(key, v)
-    }
-    v
-  }
-}
-
-private class BoundedMap[K, V](val maxSize: Int) extends JLinkedHashMap[K,V] {
-  override protected def removeEldestEntry(eldest: JMap.Entry[K, V]): Boolean = size > maxSize
-}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
index 118dca7..3cd1246 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.dataformat.Decimal;
 import org.apache.flink.table.types.logical.DateType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.utils.ThreadLocalCache;
 
 import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnit;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index 2e00fa1..6ae9b8b 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.dataformat.BinaryStringUtil;
 import org.apache.flink.table.dataformat.Decimal;
 import org.apache.flink.table.runtime.util.JsonUtils;
 import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.table.utils.ThreadLocalCache;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;