You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "liuzqt (via GitHub)" <gi...@apache.org> on 2023/04/27 21:02:50 UTC

[GitHub] [spark] liuzqt opened a new pull request, #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

liuzqt opened a new pull request, #40982:
URL: https://github.com/apache/spark/pull/40982

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   
   Create `NonFateSharingCache` to wrap around Guava cache with a KeyLock to synchronize all requests with the same key, so they will run individually and fail as if they come one at a time.
   
   Wrap cache in `CodeGenerator` as an example. Feel free to use this in other places where we used Guava cache and don't want fate-sharing behavior.
   
   Also, instead of implementing Guava Cache and LoadingCache interface, let's define our own narrower APIs so that we can control at compile time what cache operations are allowed and make sure all cache loading action go through our narrow waist code path with key lock. Feel free to add new APIs when needed.
   
   ### Why are the changes needed?
   
   Guava cache is widely used in spark, however, it suffers from fate-sharing behavior: If there are multiple requests trying to access the same key in the cache at the same time when the key is not in the cache, Guava cache will block all requests and create the object only once. If the creation fails, all requests will fail immediately without retry. So we might see task failure due to irrelevant failure in other queries due to fate sharing.
   
   This fate sharing behavior might lead to unexpected results in some situation.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   
   ### How was this patch tested?
   UT
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] liuzqt commented on pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "liuzqt (via GitHub)" <gi...@apache.org>.
liuzqt commented on PR #40982:
URL: https://github.com/apache/spark/pull/40982#issuecomment-1546113299

   Hi @ryan-johnson-databricks would you mind triggering the merge for this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] liuzqt commented on a diff in pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "liuzqt (via GitHub)" <gi...@apache.org>.
liuzqt commented on code in PR #40982:
URL: https://github.com/apache/spark/pull/40982#discussion_r1191657688


##########
core/src/test/scala/org/apache/spark/util/NonFateSharingCacheSuite.scala:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util;
+
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.Semaphore
+import java.util.concurrent.atomic.AtomicReference
+
+import com.google.common.cache.CacheBuilder
+import com.google.common.cache.CacheLoader
+
+import org.apache.spark.SparkFunSuite
+
+object NonFateSharingCacheSuite {
+  private val TEST_KEY = "key"
+  private val FAIL_MESSAGE = "loading failed"
+  private val THREAD2_HOLDER = new AtomicReference[Thread](null)
+
+  class TestCacheLoader extends CacheLoader[String, String] {
+    var intentionalFail: ThreadLocal[Boolean] = ThreadLocal.withInitial(() => false)
+    var startLoading = new Semaphore(0)
+
+    def waitUntilThread2Waiting(): Unit = {
+      while (true) {
+        Thread.sleep(100)
+        val t2 = THREAD2_HOLDER.get()
+        if (t2 != null && t2.getState.equals(Thread.State.WAITING)) {
+          return
+        }
+      }
+    }
+
+    override def load(key: String): String = {
+      startLoading.release()
+      if (Thread.currentThread().getName.contains("test-executor1")) {
+        waitUntilThread2Waiting()
+      }
+      if (intentionalFail.get) throw new RuntimeException(FAIL_MESSAGE)
+      key
+    }
+  }
+}
+
+/**
+ * Test non-fate-sharing behavior
+ */
+class NonFateSharingCacheSuite extends SparkFunSuite {
+
+  type WorkerFunc = () => Unit
+
+  import NonFateSharingCacheSuite._
+
+  test("test LoadingCache") {
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(TEST_KEY)
+    }
+    val thread2Task: WorkerFunc = () => {
+      loadingCache.get(TEST_KEY)
+    }
+    testImpl(loadingCache, loader, thread1Task, thread2Task)
+  }
+
+  test("test LoadingCache mix usage of default loader and provided loader") {
+    // Intentionally mix usage of default loader and provided value loader.
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    val thread2Task: WorkerFunc = () => {
+      loadingCache.get(TEST_KEY)
+    }
+    testImpl(loadingCache, loader, thread1Task, thread2Task)
+  }
+
+  test("test Cache") {
+    val loader = new TestCacheLoader
+    val cache = NonFateSharingCache(CacheBuilder.newBuilder.build[String, String])
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      cache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    val thread2Task: WorkerFunc = () => {
+      cache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    testImpl(cache, loader, thread1Task, thread2Task)
+  }
+
+  def testImpl(
+    cache: NonFateSharingCache[String, String],
+    loader: TestCacheLoader,
+    thread1Task: WorkerFunc,
+    thread2Task: WorkerFunc): Unit = {
+    val executor1 = ThreadUtils.newDaemonSingleThreadExecutor("test-executor1")
+    val executor2 = ThreadUtils.newDaemonSingleThreadExecutor("test-executor2")
+    val f1 = executor1.submit(new Runnable {
+      override def run(): Unit = {
+        thread1Task()
+      }
+    })
+    val f2 = executor2.submit(new Runnable {
+      override def run(): Unit = {
+        loader.startLoading.acquire() // wait until thread1 start loading
+        THREAD2_HOLDER.set(Thread.currentThread())
+        thread2Task()
+      }
+    })

Review Comment:
   I was hitting a overload resolution issue:
   ```
   ambiguous reference to overloaded definition,
   [error] both method submit in class AbstractExecutorService of type [T](x$1: java.util.concurrent.Callable[T])java.util.concurrent.Future[T]
   [error] and  method submit in class AbstractExecutorService of type (x$1: Runnable)java.util.concurrent.Future[_]
   
   ```
   But I can do 
   ```
   val r1: Runnable = () => thread1Task()
   val f1 = executor1.submit(r1)
   ```
   instead
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40982:
URL: https://github.com/apache/spark/pull/40982#discussion_r1191590888


##########
core/src/test/scala/org/apache/spark/util/NonFateSharingCacheSuite.scala:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util;
+
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.Semaphore
+import java.util.concurrent.atomic.AtomicReference
+
+import com.google.common.cache.CacheBuilder
+import com.google.common.cache.CacheLoader
+
+import org.apache.spark.SparkFunSuite
+
+object NonFateSharingCacheSuite {
+  private val TEST_KEY = "key"
+  private val FAIL_MESSAGE = "loading failed"
+  private val THREAD2_HOLDER = new AtomicReference[Thread](null)
+
+  class TestCacheLoader extends CacheLoader[String, String] {
+    var intentionalFail: ThreadLocal[Boolean] = ThreadLocal.withInitial(() => false)
+    var startLoading = new Semaphore(0)
+
+    def waitUntilThread2Waiting(): Unit = {
+      while (true) {
+        Thread.sleep(100)
+        val t2 = THREAD2_HOLDER.get()
+        if (t2 != null && t2.getState.equals(Thread.State.WAITING)) {
+          return
+        }
+      }
+    }
+
+    override def load(key: String): String = {
+      startLoading.release()
+      if (Thread.currentThread().getName.contains("test-executor1")) {
+        waitUntilThread2Waiting()
+      }
+      if (intentionalFail.get) throw new RuntimeException(FAIL_MESSAGE)
+      key
+    }
+  }
+}
+
+/**
+ * Test non-fate-sharing behavior
+ */
+class NonFateSharingCacheSuite extends SparkFunSuite {
+
+  type WorkerFunc = () => Unit
+
+  import NonFateSharingCacheSuite._
+
+  test("test LoadingCache") {
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(TEST_KEY)
+    }
+    val thread2Task: WorkerFunc = () => {
+      loadingCache.get(TEST_KEY)
+    }
+    testImpl(loadingCache, loader, thread1Task, thread2Task)
+  }
+
+  test("test LoadingCache mix usage of default loader and provided loader") {
+    // Intentionally mix usage of default loader and provided value loader.
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    val thread2Task: WorkerFunc = () => {
+      loadingCache.get(TEST_KEY)
+    }
+    testImpl(loadingCache, loader, thread1Task, thread2Task)
+  }
+
+  test("test Cache") {
+    val loader = new TestCacheLoader
+    val cache = NonFateSharingCache(CacheBuilder.newBuilder.build[String, String])
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      cache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    val thread2Task: WorkerFunc = () => {
+      cache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    testImpl(cache, loader, thread1Task, thread2Task)
+  }
+
+  def testImpl(
+    cache: NonFateSharingCache[String, String],
+    loader: TestCacheLoader,
+    thread1Task: WorkerFunc,
+    thread2Task: WorkerFunc): Unit = {
+    val executor1 = ThreadUtils.newDaemonSingleThreadExecutor("test-executor1")
+    val executor2 = ThreadUtils.newDaemonSingleThreadExecutor("test-executor2")
+    val f1 = executor1.submit(new Runnable {
+      override def run(): Unit = {
+        thread1Task()

Review Comment:
   nit: can be a one-liner:
   ```scala
   override def run(): Unit = thread1Task()
   ```
   (but first see runnable interface comment below)



##########
core/src/test/scala/org/apache/spark/util/NonFateSharingCacheSuite.scala:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util;
+
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.Semaphore
+import java.util.concurrent.atomic.AtomicReference
+
+import com.google.common.cache.CacheBuilder
+import com.google.common.cache.CacheLoader
+
+import org.apache.spark.SparkFunSuite
+
+object NonFateSharingCacheSuite {
+  private val TEST_KEY = "key"
+  private val FAIL_MESSAGE = "loading failed"
+  private val THREAD2_HOLDER = new AtomicReference[Thread](null)
+
+  class TestCacheLoader extends CacheLoader[String, String] {
+    var intentionalFail: ThreadLocal[Boolean] = ThreadLocal.withInitial(() => false)
+    var startLoading = new Semaphore(0)
+
+    def waitUntilThread2Waiting(): Unit = {
+      while (true) {
+        Thread.sleep(100)
+        val t2 = THREAD2_HOLDER.get()
+        if (t2 != null && t2.getState.equals(Thread.State.WAITING)) {
+          return
+        }
+      }
+    }
+
+    override def load(key: String): String = {
+      startLoading.release()
+      if (Thread.currentThread().getName.contains("test-executor1")) {
+        waitUntilThread2Waiting()
+      }
+      if (intentionalFail.get) throw new RuntimeException(FAIL_MESSAGE)
+      key
+    }
+  }
+}
+
+/**
+ * Test non-fate-sharing behavior
+ */
+class NonFateSharingCacheSuite extends SparkFunSuite {
+
+  type WorkerFunc = () => Unit
+
+  import NonFateSharingCacheSuite._
+
+  test("test LoadingCache") {
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(TEST_KEY)
+    }
+    val thread2Task: WorkerFunc = () => {
+      loadingCache.get(TEST_KEY)
+    }
+    testImpl(loadingCache, loader, thread1Task, thread2Task)
+  }
+
+  test("test LoadingCache mix usage of default loader and provided loader") {
+    // Intentionally mix usage of default loader and provided value loader.
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    val thread2Task: WorkerFunc = () => {
+      loadingCache.get(TEST_KEY)
+    }
+    testImpl(loadingCache, loader, thread1Task, thread2Task)
+  }
+
+  test("test Cache") {
+    val loader = new TestCacheLoader
+    val cache = NonFateSharingCache(CacheBuilder.newBuilder.build[String, String])
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      cache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    val thread2Task: WorkerFunc = () => {
+      cache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    testImpl(cache, loader, thread1Task, thread2Task)
+  }
+
+  def testImpl(
+    cache: NonFateSharingCache[String, String],
+    loader: TestCacheLoader,
+    thread1Task: WorkerFunc,
+    thread2Task: WorkerFunc): Unit = {

Review Comment:
   nit: indent these 4 spaces?



##########
core/src/test/scala/org/apache/spark/util/NonFateSharingCacheSuite.scala:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util;
+
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.Semaphore
+import java.util.concurrent.atomic.AtomicReference
+
+import com.google.common.cache.CacheBuilder
+import com.google.common.cache.CacheLoader
+
+import org.apache.spark.SparkFunSuite
+
+object NonFateSharingCacheSuite {
+  private val TEST_KEY = "key"
+  private val FAIL_MESSAGE = "loading failed"
+  private val THREAD2_HOLDER = new AtomicReference[Thread](null)
+
+  class TestCacheLoader extends CacheLoader[String, String] {
+    var intentionalFail: ThreadLocal[Boolean] = ThreadLocal.withInitial(() => false)
+    var startLoading = new Semaphore(0)
+
+    def waitUntilThread2Waiting(): Unit = {
+      while (true) {
+        Thread.sleep(100)
+        val t2 = THREAD2_HOLDER.get()
+        if (t2 != null && t2.getState.equals(Thread.State.WAITING)) {
+          return
+        }
+      }
+    }
+
+    override def load(key: String): String = {
+      startLoading.release()
+      if (Thread.currentThread().getName.contains("test-executor1")) {
+        waitUntilThread2Waiting()
+      }
+      if (intentionalFail.get) throw new RuntimeException(FAIL_MESSAGE)
+      key
+    }
+  }
+}
+
+/**
+ * Test non-fate-sharing behavior
+ */
+class NonFateSharingCacheSuite extends SparkFunSuite {
+
+  type WorkerFunc = () => Unit
+
+  import NonFateSharingCacheSuite._
+
+  test("test LoadingCache") {
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(TEST_KEY)
+    }
+    val thread2Task: WorkerFunc = () => {
+      loadingCache.get(TEST_KEY)
+    }
+    testImpl(loadingCache, loader, thread1Task, thread2Task)
+  }
+
+  test("test LoadingCache mix usage of default loader and provided loader") {
+    // Intentionally mix usage of default loader and provided value loader.
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)

Review Comment:
   nit: any particular reason for the line breaks?
   ```scala
   loadingCache.get(TEST_KEY, () => loader.load(TEST_KEY))
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:
##########
@@ -1577,7 +1577,7 @@ object CodeGenerator extends Logging {
    * automatically, in order to constrain its memory footprint.  Note that this cache does not use
    * weak keys/values and thus does not respond to memory pressure.
    */
-  private val cache = CacheBuilder.newBuilder()
+  private val cache = NonFateSharingCache(CacheBuilder.newBuilder()

Review Comment:
   I don't see any comment explaining why codegen needs a non fate sharing cache? What bug does it fix?



##########
core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.Callable
+
+import com.google.common.cache.Cache
+import com.google.common.cache.LoadingCache
+
+/**
+ * SPARK-43300: Guava cache fate-sharing behavior might lead to unexpected cascade failure:
+ * when multiple threads access the same key in the cache at the same time when the key is not in
+ * the cache, Guava cache will block all requests and load the data only once. If the loading fails,
+ * all requests will fail immediately without retry. Therefore individual failure will also fail
+ * other irrelevant queries who are waiting for the same key.
+ *
+ * This util create a delegation Cache with KeyLock to synchronize threads looking for the same key
+ * so that they should run individually and fail as if they had arrived one at a time.
+ *
+ * Instead of implementing Guava Cache and LoadingCache interface, we defined our own narrower APIs
+ * so that we can control at compile time what cache operations are allowed. Feel free to add new
+ * APIs when needed.

Review Comment:
   I don't think the "feel free to..." comment is especially helpful... can probably just remove it.



##########
core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.Callable
+
+import com.google.common.cache.Cache
+import com.google.common.cache.LoadingCache
+
+/**
+ * SPARK-43300: Guava cache fate-sharing behavior might lead to unexpected cascade failure:
+ * when multiple threads access the same key in the cache at the same time when the key is not in
+ * the cache, Guava cache will block all requests and load the data only once. If the loading fails,
+ * all requests will fail immediately without retry. Therefore individual failure will also fail
+ * other irrelevant queries who are waiting for the same key.
+ *
+ * This util create a delegation Cache with KeyLock to synchronize threads looking for the same key
+ * so that they should run individually and fail as if they had arrived one at a time.
+ *
+ * Instead of implementing Guava Cache and LoadingCache interface, we defined our own narrower APIs
+ * so that we can control at compile time what cache operations are allowed. Feel free to add new
+ * APIs when needed.
+ */
+object NonFateSharingCache {
+  def apply[K, V](cache: Cache[K, V]): NonFateSharingCache[K, V] = cache match {

Review Comment:
   Worth a quick doc comment to explain that this method will return a `NonFateSharingLoadingCache` if the user happens to pass a `LoadingCache`, as a courtesy to the user?



##########
core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.Callable
+
+import com.google.common.cache.Cache
+import com.google.common.cache.LoadingCache
+
+/**
+ * SPARK-43300: Guava cache fate-sharing behavior might lead to unexpected cascade failure:
+ * when multiple threads access the same key in the cache at the same time when the key is not in
+ * the cache, Guava cache will block all requests and load the data only once. If the loading fails,
+ * all requests will fail immediately without retry. Therefore individual failure will also fail
+ * other irrelevant queries who are waiting for the same key.
+ *
+ * This util create a delegation Cache with KeyLock to synchronize threads looking for the same key
+ * so that they should run individually and fail as if they had arrived one at a time.
+ *
+ * Instead of implementing Guava Cache and LoadingCache interface, we defined our own narrower APIs
+ * so that we can control at compile time what cache operations are allowed. Feel free to add new
+ * APIs when needed.

Review Comment:
   Meanwhile, it would probably be helpful to explain _why_ we don't/can't/won't expose the full Guava cache API?



##########
core/src/test/scala/org/apache/spark/util/NonFateSharingCacheSuite.scala:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util;
+
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.Semaphore
+import java.util.concurrent.atomic.AtomicReference
+
+import com.google.common.cache.CacheBuilder
+import com.google.common.cache.CacheLoader
+
+import org.apache.spark.SparkFunSuite
+
+object NonFateSharingCacheSuite {
+  private val TEST_KEY = "key"
+  private val FAIL_MESSAGE = "loading failed"
+  private val THREAD2_HOLDER = new AtomicReference[Thread](null)
+
+  class TestCacheLoader extends CacheLoader[String, String] {
+    var intentionalFail: ThreadLocal[Boolean] = ThreadLocal.withInitial(() => false)
+    var startLoading = new Semaphore(0)
+
+    def waitUntilThread2Waiting(): Unit = {
+      while (true) {
+        Thread.sleep(100)
+        val t2 = THREAD2_HOLDER.get()
+        if (t2 != null && t2.getState.equals(Thread.State.WAITING)) {
+          return
+        }
+      }
+    }
+
+    override def load(key: String): String = {
+      startLoading.release()
+      if (Thread.currentThread().getName.contains("test-executor1")) {
+        waitUntilThread2Waiting()
+      }
+      if (intentionalFail.get) throw new RuntimeException(FAIL_MESSAGE)
+      key
+    }
+  }
+}
+
+/**
+ * Test non-fate-sharing behavior
+ */
+class NonFateSharingCacheSuite extends SparkFunSuite {
+
+  type WorkerFunc = () => Unit
+
+  import NonFateSharingCacheSuite._
+
+  test("test LoadingCache") {
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(TEST_KEY)
+    }
+    val thread2Task: WorkerFunc = () => {
+      loadingCache.get(TEST_KEY)
+    }
+    testImpl(loadingCache, loader, thread1Task, thread2Task)
+  }
+
+  test("test LoadingCache mix usage of default loader and provided loader") {
+    // Intentionally mix usage of default loader and provided value loader.
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    val thread2Task: WorkerFunc = () => {
+      loadingCache.get(TEST_KEY)
+    }
+    testImpl(loadingCache, loader, thread1Task, thread2Task)
+  }
+
+  test("test Cache") {
+    val loader = new TestCacheLoader
+    val cache = NonFateSharingCache(CacheBuilder.newBuilder.build[String, String])
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      cache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    val thread2Task: WorkerFunc = () => {
+      cache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    testImpl(cache, loader, thread1Task, thread2Task)
+  }
+
+  def testImpl(
+    cache: NonFateSharingCache[String, String],
+    loader: TestCacheLoader,
+    thread1Task: WorkerFunc,
+    thread2Task: WorkerFunc): Unit = {
+    val executor1 = ThreadUtils.newDaemonSingleThreadExecutor("test-executor1")
+    val executor2 = ThreadUtils.newDaemonSingleThreadExecutor("test-executor2")
+    val f1 = executor1.submit(new Runnable {
+      override def run(): Unit = {
+        thread1Task()
+      }
+    })
+    val f2 = executor2.submit(new Runnable {
+      override def run(): Unit = {
+        loader.startLoading.acquire() // wait until thread1 start loading
+        THREAD2_HOLDER.set(Thread.currentThread())
+        thread2Task()
+      }
+    })

Review Comment:
   Double check, but I'm pretty sure that this can simplify because runnable is a callable interface:
   ```scala
   val f1 = executor1.submit(() => thread1Task())
   val f2 = executor1.submit { () =>
     ...
   }
   ```
   



##########
core/src/test/scala/org/apache/spark/util/NonFateSharingCacheSuite.scala:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util;
+
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.Semaphore
+import java.util.concurrent.atomic.AtomicReference
+
+import com.google.common.cache.CacheBuilder
+import com.google.common.cache.CacheLoader
+
+import org.apache.spark.SparkFunSuite
+
+object NonFateSharingCacheSuite {
+  private val TEST_KEY = "key"
+  private val FAIL_MESSAGE = "loading failed"
+  private val THREAD2_HOLDER = new AtomicReference[Thread](null)
+
+  class TestCacheLoader extends CacheLoader[String, String] {
+    var intentionalFail: ThreadLocal[Boolean] = ThreadLocal.withInitial(() => false)
+    var startLoading = new Semaphore(0)
+
+    def waitUntilThread2Waiting(): Unit = {
+      while (true) {
+        Thread.sleep(100)
+        val t2 = THREAD2_HOLDER.get()
+        if (t2 != null && t2.getState.equals(Thread.State.WAITING)) {
+          return
+        }
+      }
+    }
+
+    override def load(key: String): String = {
+      startLoading.release()
+      if (Thread.currentThread().getName.contains("test-executor1")) {
+        waitUntilThread2Waiting()
+      }
+      if (intentionalFail.get) throw new RuntimeException(FAIL_MESSAGE)
+      key
+    }
+  }
+}
+
+/**
+ * Test non-fate-sharing behavior
+ */
+class NonFateSharingCacheSuite extends SparkFunSuite {
+
+  type WorkerFunc = () => Unit
+
+  import NonFateSharingCacheSuite._
+
+  test("test LoadingCache") {
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(TEST_KEY)
+    }
+    val thread2Task: WorkerFunc = () => {
+      loadingCache.get(TEST_KEY)
+    }
+    testImpl(loadingCache, loader, thread1Task, thread2Task)
+  }
+
+  test("test LoadingCache mix usage of default loader and provided loader") {
+    // Intentionally mix usage of default loader and provided value loader.
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    val thread2Task: WorkerFunc = () => {
+      loadingCache.get(TEST_KEY)
+    }
+    testImpl(loadingCache, loader, thread1Task, thread2Task)
+  }
+
+  test("test Cache") {

Review Comment:
   Can we use more descriptive names for test cases? 
   Ideally it explains what behavior the test actually verifies.



##########
core/src/test/scala/org/apache/spark/util/NonFateSharingCacheSuite.scala:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util;
+
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.Semaphore
+import java.util.concurrent.atomic.AtomicReference
+
+import com.google.common.cache.CacheBuilder
+import com.google.common.cache.CacheLoader
+
+import org.apache.spark.SparkFunSuite
+
+object NonFateSharingCacheSuite {
+  private val TEST_KEY = "key"
+  private val FAIL_MESSAGE = "loading failed"
+  private val THREAD2_HOLDER = new AtomicReference[Thread](null)
+
+  class TestCacheLoader extends CacheLoader[String, String] {
+    var intentionalFail: ThreadLocal[Boolean] = ThreadLocal.withInitial(() => false)
+    var startLoading = new Semaphore(0)
+
+    def waitUntilThread2Waiting(): Unit = {
+      while (true) {
+        Thread.sleep(100)
+        val t2 = THREAD2_HOLDER.get()
+        if (t2 != null && t2.getState.equals(Thread.State.WAITING)) {

Review Comment:
   nit: use Option instead of null check?
   ```scala
   if (Option(THREAD2_HOLDER.get()).exists(_.getState.equals(Thread.State.WAITING))) {
   ```



##########
core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.Callable
+
+import com.google.common.cache.Cache
+import com.google.common.cache.LoadingCache
+
+/**
+ * SPARK-43300: Guava cache fate-sharing behavior might lead to unexpected cascade failure:
+ * when multiple threads access the same key in the cache at the same time when the key is not in
+ * the cache, Guava cache will block all requests and load the data only once. If the loading fails,
+ * all requests will fail immediately without retry. Therefore individual failure will also fail
+ * other irrelevant queries who are waiting for the same key.
+ *
+ * This util create a delegation Cache with KeyLock to synchronize threads looking for the same key
+ * so that they should run individually and fail as if they had arrived one at a time.
+ *
+ * Instead of implementing Guava Cache and LoadingCache interface, we defined our own narrower APIs

Review Comment:
   ```suggestion
    * Instead of implementing Guava Cache and LoadingCache interface, we expose a subset of APIs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen commented on PR #40982:
URL: https://github.com/apache/spark/pull/40982#issuecomment-1548847605

   I've merged this to master (Spark 3.5.0). Thanks @liuzqt!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on a diff in pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen commented on code in PR #40982:
URL: https://github.com/apache/spark/pull/40982#discussion_r1192877450


##########
core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.Callable
+
+import com.google.common.cache.Cache
+import com.google.common.cache.LoadingCache
+
+/**
+ * SPARK-43300: Guava cache fate-sharing behavior might lead to unexpected cascade failure:
+ * when multiple threads access the same key in the cache at the same time when the key is not in
+ * the cache, Guava cache will block all requests and load the data only once. If the loading fails,
+ * all requests will fail immediately without retry. Therefore individual failure will also fail
+ * other irrelevant queries who are waiting for the same key. Given that spark can cancel tasks at
+ * arbitrary times for many different reasons, fate sharing means that a task which gets canceled
+ * while populating a cache entry can cause spurious failures in tasks from unrelated jobs -- even
+ * though those tasks would have successfully populated the cache if they had been allowed to try.
+ *
+ * This util Cache wrapper with KeyLock to synchronize threads looking for the same key
+ * so that they should run individually and fail as if they had arrived one at a time.
+ *
+ * There are so many ways to add cache entries in Guava Cache, instead of implementing Guava Cache
+ * and LoadingCache interface, we expose a subset of APIs so that we can control at compile time
+ * what cache operations are allowed.
+ */
+object NonFateSharingCache {

Review Comment:
   Can we make this `private[spark]` so that user code can't take a dependency on it?



##########
core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.Callable
+
+import com.google.common.cache.Cache
+import com.google.common.cache.LoadingCache
+
+/**
+ * SPARK-43300: Guava cache fate-sharing behavior might lead to unexpected cascade failure:
+ * when multiple threads access the same key in the cache at the same time when the key is not in
+ * the cache, Guava cache will block all requests and load the data only once. If the loading fails,
+ * all requests will fail immediately without retry. Therefore individual failure will also fail
+ * other irrelevant queries who are waiting for the same key. Given that spark can cancel tasks at
+ * arbitrary times for many different reasons, fate sharing means that a task which gets canceled
+ * while populating a cache entry can cause spurious failures in tasks from unrelated jobs -- even
+ * though those tasks would have successfully populated the cache if they had been allowed to try.
+ *
+ * This util Cache wrapper with KeyLock to synchronize threads looking for the same key
+ * so that they should run individually and fail as if they had arrived one at a time.
+ *
+ * There are so many ways to add cache entries in Guava Cache, instead of implementing Guava Cache
+ * and LoadingCache interface, we expose a subset of APIs so that we can control at compile time
+ * what cache operations are allowed.
+ */
+object NonFateSharingCache {
+  /**
+   * This will return a NonFateSharingLoadingCache instance if user happens to pass a LoadingCache
+   */
+  def apply[K, V](cache: Cache[K, V]): NonFateSharingCache[K, V] = cache match {
+    case loadingCache: LoadingCache[K, V] => apply(loadingCache)
+    case _ => new NonFateSharingCache(cache)
+  }
+
+  def apply[K, V](loadingCache: LoadingCache[K, V]): NonFateSharingLoadingCache[K, V] =
+    new NonFateSharingLoadingCache(loadingCache)
+}
+
+class NonFateSharingCache[K, V](protected val cache: Cache[K, V]) {
+
+  protected val keyLock = new KeyLock[K]
+
+  def get(key: K, valueLoader: Callable[_ <: V]): V = keyLock.withLock(key) {
+    cache.get(key, valueLoader)
+  }
+
+  def getIfPresent(key: Any): V = cache.getIfPresent(key)
+
+  def invalidate(key: Any): Unit = cache.invalidate(key)
+
+  def invalidateAll(): Unit = cache.invalidateAll()
+
+  def size(): Long = cache.size()
+}
+
+class NonFateSharingLoadingCache[K, V](

Review Comment:
   Similarly, `private[spark]` here as well?



##########
core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.Callable
+
+import com.google.common.cache.Cache
+import com.google.common.cache.LoadingCache
+
+/**
+ * SPARK-43300: Guava cache fate-sharing behavior might lead to unexpected cascade failure:
+ * when multiple threads access the same key in the cache at the same time when the key is not in
+ * the cache, Guava cache will block all requests and load the data only once. If the loading fails,
+ * all requests will fail immediately without retry. Therefore individual failure will also fail
+ * other irrelevant queries who are waiting for the same key. Given that spark can cancel tasks at
+ * arbitrary times for many different reasons, fate sharing means that a task which gets canceled
+ * while populating a cache entry can cause spurious failures in tasks from unrelated jobs -- even
+ * though those tasks would have successfully populated the cache if they had been allowed to try.
+ *
+ * This util Cache wrapper with KeyLock to synchronize threads looking for the same key
+ * so that they should run individually and fail as if they had arrived one at a time.
+ *
+ * There are so many ways to add cache entries in Guava Cache, instead of implementing Guava Cache
+ * and LoadingCache interface, we expose a subset of APIs so that we can control at compile time
+ * what cache operations are allowed.
+ */
+object NonFateSharingCache {
+  /**
+   * This will return a NonFateSharingLoadingCache instance if user happens to pass a LoadingCache
+   */
+  def apply[K, V](cache: Cache[K, V]): NonFateSharingCache[K, V] = cache match {
+    case loadingCache: LoadingCache[K, V] => apply(loadingCache)
+    case _ => new NonFateSharingCache(cache)
+  }
+
+  def apply[K, V](loadingCache: LoadingCache[K, V]): NonFateSharingLoadingCache[K, V] =
+    new NonFateSharingLoadingCache(loadingCache)
+}
+
+class NonFateSharingCache[K, V](protected val cache: Cache[K, V]) {

Review Comment:
   Similarly, `private[spark]` here as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40982:
URL: https://github.com/apache/spark/pull/40982#discussion_r1233412076


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:
##########
@@ -1576,8 +1576,12 @@ object CodeGenerator extends Logging {
    * they are explicitly removed. A Cache on the other hand is generally configured to evict entries
    * automatically, in order to constrain its memory footprint.  Note that this cache does not use
    * weak keys/values and thus does not respond to memory pressure.
+   *
+   * Codegen can be slow. Use a non fate sharing cache in case a query gets canceled during codegen
+   * while other queries wait on the same code, so that those other queries don't get wrongly
+   * aborted. See [[NonFateSharingCache]] for more details.
    */
-  private val cache = CacheBuilder.newBuilder()
+  private val cache = NonFateSharingCache(CacheBuilder.newBuilder()

Review Comment:
   In that case, I guess we should either move this to core module, or `NonFateSharingCache` to catalyst module?
   
   Do you have other better ideas, @LuciferYang ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40982:
URL: https://github.com/apache/spark/pull/40982#discussion_r1191678570


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala:
##########
@@ -1576,8 +1576,13 @@ object CodeGenerator extends Logging {
    * they are explicitly removed. A Cache on the other hand is generally configured to evict entries
    * automatically, in order to constrain its memory footprint.  Note that this cache does not use
    * weak keys/values and thus does not respond to memory pressure.
+   *
+   * We wrap Guava Cache with NonFateSharingCache: tasks for similar queries that run concurrently
+   * can hit the same codegen cache key at the same time. If one query fails or gets canceled while
+   * some of its tasks are doing codegen, it will cause spurious failures in all other queries with
+   * tasks that wait on the same cache key. See NonFateSharingCache for more details.

Review Comment:
   If we move some of this to the NonFateSharingCache doc comment, it might suffice to say something simpler here, like:
   > Codegen can be slow. Use a non fate sharing cache in case a query gets canceled during code gen while other queries wait on the same code, so that those other queries don't get wrongly aborted. See [[NonFateSharingCache]] for more details.



##########
core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.Callable
+
+import com.google.common.cache.Cache
+import com.google.common.cache.LoadingCache
+
+/**
+ * SPARK-43300: Guava cache fate-sharing behavior might lead to unexpected cascade failure:
+ * when multiple threads access the same key in the cache at the same time when the key is not in
+ * the cache, Guava cache will block all requests and load the data only once. If the loading fails,
+ * all requests will fail immediately without retry. Therefore individual failure will also fail
+ * other irrelevant queries who are waiting for the same key.

Review Comment:
   Maybe add a final connection like:
   > Given that spark can cancel tasks at arbitrary times for many different reasons, fate sharing means that a task which gets canceled while populating a cache entry can cause spurious failures in tasks from unrelated jobs -- even though those tasks would have successfully populated the cache if they had been allowed to try.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen closed pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen closed pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache
URL: https://github.com/apache/spark/pull/40982


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on PR #40982:
URL: https://github.com/apache/spark/pull/40982#issuecomment-1546130844

   > Hi @ryan-johnson-databricks would you mind triggering the merge for this PR?
   
   Sorry, I'm not a spark committer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on PR #40982:
URL: https://github.com/apache/spark/pull/40982#issuecomment-1545902637

   PR description seems a bit vague, given that actual code comments give pretty clear reasoning for why this change is needed?
   
   > Wrap cache in `CodeGenerator` as an example. Feel free to use this in other places where we used Guava cache and don't want fate-sharing behavior.
   
   It's not just an "example" -- the code comments detail a specific bad behavior we're trying to avoid with this change.
   
   > This fate sharing behavior might lead to unexpected results in some situation.
   
   Again, we can be specific -- we know it _does_ lead to unexpected results (query canceled during codegen)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] liuzqt commented on pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "liuzqt (via GitHub)" <gi...@apache.org>.
liuzqt commented on PR #40982:
URL: https://github.com/apache/spark/pull/40982#issuecomment-1548842285

   Hi @JoshRosen would you mind merging this PR? Or maybe @cloud-fan could help?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] liuzqt closed pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "liuzqt (via GitHub)" <gi...@apache.org>.
liuzqt closed pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache
URL: https://github.com/apache/spark/pull/40982


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40982:
URL: https://github.com/apache/spark/pull/40982#discussion_r1191670654


##########
core/src/test/scala/org/apache/spark/util/NonFateSharingCacheSuite.scala:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util;
+
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.Semaphore
+import java.util.concurrent.atomic.AtomicReference
+
+import com.google.common.cache.CacheBuilder
+import com.google.common.cache.CacheLoader
+
+import org.apache.spark.SparkFunSuite
+
+object NonFateSharingCacheSuite {
+  private val TEST_KEY = "key"
+  private val FAIL_MESSAGE = "loading failed"
+  private val THREAD2_HOLDER = new AtomicReference[Thread](null)
+
+  class TestCacheLoader extends CacheLoader[String, String] {
+    var intentionalFail: ThreadLocal[Boolean] = ThreadLocal.withInitial(() => false)
+    var startLoading = new Semaphore(0)
+
+    def waitUntilThread2Waiting(): Unit = {
+      while (true) {
+        Thread.sleep(100)
+        val t2 = THREAD2_HOLDER.get()
+        if (t2 != null && t2.getState.equals(Thread.State.WAITING)) {
+          return
+        }
+      }
+    }
+
+    override def load(key: String): String = {
+      startLoading.release()
+      if (Thread.currentThread().getName.contains("test-executor1")) {
+        waitUntilThread2Waiting()
+      }
+      if (intentionalFail.get) throw new RuntimeException(FAIL_MESSAGE)
+      key
+    }
+  }
+}
+
+/**
+ * Test non-fate-sharing behavior
+ */
+class NonFateSharingCacheSuite extends SparkFunSuite {
+
+  type WorkerFunc = () => Unit
+
+  import NonFateSharingCacheSuite._
+
+  test("test LoadingCache") {
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(TEST_KEY)
+    }
+    val thread2Task: WorkerFunc = () => {
+      loadingCache.get(TEST_KEY)
+    }
+    testImpl(loadingCache, loader, thread1Task, thread2Task)
+  }
+
+  test("test LoadingCache mix usage of default loader and provided loader") {
+    // Intentionally mix usage of default loader and provided value loader.
+    val loader = new TestCacheLoader
+    val loadingCache: NonFateSharingLoadingCache[String, String] =
+      NonFateSharingCache(CacheBuilder.newBuilder.build(loader))
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      loadingCache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    val thread2Task: WorkerFunc = () => {
+      loadingCache.get(TEST_KEY)
+    }
+    testImpl(loadingCache, loader, thread1Task, thread2Task)
+  }
+
+  test("test Cache") {
+    val loader = new TestCacheLoader
+    val cache = NonFateSharingCache(CacheBuilder.newBuilder.build[String, String])
+    val thread1Task: WorkerFunc = () => {
+      loader.intentionalFail.set(true)
+      cache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    val thread2Task: WorkerFunc = () => {
+      cache.get(
+        TEST_KEY,
+        () => loader.load(TEST_KEY)
+      )
+    }
+    testImpl(cache, loader, thread1Task, thread2Task)
+  }
+
+  def testImpl(
+    cache: NonFateSharingCache[String, String],
+    loader: TestCacheLoader,
+    thread1Task: WorkerFunc,
+    thread2Task: WorkerFunc): Unit = {
+    val executor1 = ThreadUtils.newDaemonSingleThreadExecutor("test-executor1")
+    val executor2 = ThreadUtils.newDaemonSingleThreadExecutor("test-executor2")
+    val f1 = executor1.submit(new Runnable {
+      override def run(): Unit = {
+        thread1Task()
+      }
+    })
+    val f2 = executor2.submit(new Runnable {
+      override def run(): Unit = {
+        loader.startLoading.acquire() // wait until thread1 start loading
+        THREAD2_HOLDER.set(Thread.currentThread())
+        thread2Task()
+      }
+    })

Review Comment:
   Oh that's right... spark added a bunch of "helpful" overloads that tried to compensate for older JVM... but on newer JVM with runnable interface support the overload gets in the way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40982: [SPARK-43300][CORE] NonFateSharingCache wrapper for Guava Cache

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40982:
URL: https://github.com/apache/spark/pull/40982#issuecomment-1594026021

   @liuzqt @JoshRosen Report an [issue]([SPARK-44064](https://issues.apache.org/jira/browse/SPARK-44064)) about maven test related to this pr.
   
   The `CodeGenerator` in the catalyst module uses the `NonFateSharingCache` in the core module, and the input parameter of `NonFateSharingCache#apply` is `com.google.common.cache.Cache`.
   
   The catalyst module may use shaded spark-core jar when we do maven test, the Guava related classes in the core module will be relocated from `com.google.common` to `org.sparkproject.guava`, so the input parameter of `NonFateSharingCache#apply` will change to `org.sparkproject.guava.cache.Cache`, but the catalyst module has not been shaded yet when do maven test, so `CodeGenerator` will still use `com.google.common.cache.Cache` to call the `NonFateSharingCache#apply` function, this will result in a mismatch of input types when do maven test and maven test will failed.
   
   The steps to reproduce are as follows:
   
   ```
    build/mvn clean install -DskipTests -pl sql/catalyst -am
    build/mvn test -pl sql/catalyst 
   ```
   
   the catalyst module maven test aborted as follows:
   
   ```
   ProductAggSuite:
   *** RUN ABORTED ***
     java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$
     at org.apache.spark.sql.catalyst.expressions.codegen.JavaCode$.variable(javaCode.scala:64)
     at org.apache.spark.sql.catalyst.expressions.codegen.JavaCode$.isNullVariable(javaCode.scala:77)
     at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
     at scala.Option.getOrElse(Option.scala:189)
     at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:196)
     at org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.$anonfun$create$1(GenerateSafeProjection.scala:156)
     at scala.collection.immutable.List.map(List.scala:293)
     at org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:153)
     at org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:39)
     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1369) 
   ```
   
   I revert this patch locally, and the above maven test can pass.
   
   also cc @HyukjinKwon to know


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org