You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/12/11 20:39:32 UTC

[spark] branch branch-2.4 updated (e80577c -> e35d287)

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git.


    from e80577c  [SPARK-26307][SQL] Fix CTAS when INSERT a partitioned table using Hive serde
     new 2f038d0  [SPARK-26327][SQL][BACKPORT-2.4] Bug fix for `FileSourceScanExec` metrics update
     new e35d287  [SPARK-26265][CORE][BRANCH-2.4] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/spark/unsafe/map/BytesToBytesMap.java   | 12 +++++-
 .../apache/spark/memory/TestMemoryConsumer.java    |  4 +-
 .../unsafe/map/AbstractBytesToBytesMapSuite.java   | 47 ++++++++++++++++++++++
 .../spark/sql/execution/DataSourceScanExec.scala   | 26 ++++++++----
 .../sql/execution/metric/SQLMetricsSuite.scala     | 15 +++++++
 5 files changed, 93 insertions(+), 11 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[spark] 01/02: [SPARK-26327][SQL][BACKPORT-2.4] Bug fix for `FileSourceScanExec` metrics update

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 2f038d0cefb39cb2b92c22ff8dc130417f342910
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Tue Dec 11 10:03:47 2018 -0800

    [SPARK-26327][SQL][BACKPORT-2.4] Bug fix for `FileSourceScanExec` metrics update
    
    ## What changes were proposed in this pull request?
    
    Backport #23277 to branch 2.4 without the metrics renaming.
    
    ## How was this patch tested?
    
    New test case in `SQLMetricsSuite`.
    
    Closes #23287 from xuanyuanking/SPARK-26327-2.4.
    
    Authored-by: Yuanjian Li <xy...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 26 +++++++++++++++-------
 .../sql/execution/metric/SQLMetricsSuite.scala     | 15 +++++++++++++
 2 files changed, 33 insertions(+), 8 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 36ed016..5433c30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -185,19 +185,14 @@ case class FileSourceScanExec(
       partitionSchema = relation.partitionSchema,
       relation.sparkSession.sessionState.conf)
 
+  private var metadataTime = 0L
+
   @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
     val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
     val startTime = System.nanoTime()
     val ret = relation.location.listFiles(partitionFilters, dataFilters)
     val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
-
-    metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
-    metrics("metadataTime").add(timeTakenMs)
-
-    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
-      metrics("numFiles") :: metrics("metadataTime") :: Nil)
-
+    metadataTime = timeTakenMs
     ret
   }
 
@@ -308,6 +303,8 @@ case class FileSourceScanExec(
   }
 
   private lazy val inputRDD: RDD[InternalRow] = {
+    // Update metrics for taking effect in both code generation node and normal node.
+    updateDriverMetrics()
     val readFile: (PartitionedFile) => Iterator[InternalRow] =
       relation.fileFormat.buildReaderWithPartitionValues(
         sparkSession = relation.sparkSession,
@@ -524,6 +521,19 @@ case class FileSourceScanExec(
     }
   }
 
+  /**
+   * Send the updated metrics to driver, while this function calling, selectedPartitions has
+   * been initialized. See SPARK-26327 for more detail.
+   */
+  private def updateDriverMetrics() = {
+    metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
+    metrics("metadataTime").add(metadataTime)
+
+    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+      metrics("numFiles") :: metrics("metadataTime") :: Nil)
+  }
+
   override def doCanonicalize(): FileSourceScanExec = {
     FileSourceScanExec(
       relation,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 085a445..c550bf2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -570,4 +570,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
       }
     }
   }
+
+  test("SPARK-26327: FileSourceScanExec metrics") {
+    withTable("testDataForScan") {
+      spark.range(10).selectExpr("id", "id % 3 as p")
+        .write.partitionBy("p").saveAsTable("testDataForScan")
+      // The execution plan only has 1 FileScan node.
+      val df = spark.sql(
+        "SELECT * FROM testDataForScan WHERE p = 1")
+      testSparkPlanMetrics(df, 1, Map(
+        0L -> (("Scan parquet default.testdataforscan", Map(
+          "number of output rows" -> 3L,
+          "number of files" -> 2L))))
+      )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[spark] 02/02: [SPARK-26265][CORE][BRANCH-2.4] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git

commit e35d287dd9fd5b7bd7e06025f535772b482b443c
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Dec 11 12:22:58 2018 -0800

    [SPARK-26265][CORE][BRANCH-2.4] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager
    
    ## What changes were proposed in this pull request?
    
    In `BytesToBytesMap.MapIterator.advanceToNextPage`, We will first lock this `MapIterator` and then `TaskMemoryManager` when going to free a memory page by calling `freePage`. At the same time, it is possibly that another memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it acquires memory and causes spilling on this `MapIterator`.
    
    So it ends with the `MapIterator` object holds lock to the `MapIterator` object and waits for lock on `TaskMemoryManager`, and the other consumer holds lock to `TaskMemoryManager` and waits for lock on the `MapIterator` object.
    
    To avoid deadlock here, this patch proposes to keep reference to the page to free and free it after releasing the lock of `MapIterator`.
    
    This backports the fix to branch-2.4.
    
    ## How was this patch tested?
    
     Added test and manually test by running the test 100 times to make sure there is no deadlock.
    
    Closes #23289 from viirya/SPARK-26265-2.4.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java   | 12 +++++-
 .../apache/spark/memory/TestMemoryConsumer.java    |  4 +-
 .../unsafe/map/AbstractBytesToBytesMapSuite.java   | 47 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 9b6cbab..6465033 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -267,11 +267,18 @@ public final class BytesToBytesMap extends MemoryConsumer {
     }
 
     private void advanceToNextPage() {
+      // SPARK-26265: We will first lock this `MapIterator` and then `TaskMemoryManager` when going
+      // to free a memory page by calling `freePage`. At the same time, it is possibly that another
+      // memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it
+      // acquires memory and causes spilling on this `MapIterator`. To avoid deadlock here, we keep
+      // reference to the page to free and free it after releasing the lock of `MapIterator`.
+      MemoryBlock pageToFree = null;
+
       synchronized (this) {
         int nextIdx = dataPages.indexOf(currentPage) + 1;
         if (destructive && currentPage != null) {
           dataPages.remove(currentPage);
-          freePage(currentPage);
+          pageToFree = currentPage;
           nextIdx --;
         }
         if (dataPages.size() > nextIdx) {
@@ -295,6 +302,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
           }
         }
       }
+      if (pageToFree != null) {
+        freePage(pageToFree);
+      }
     }
 
     @Override
diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
index 0bbaea6..6aa577d 100644
--- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
+++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
@@ -38,12 +38,12 @@ public class TestMemoryConsumer extends MemoryConsumer {
     return used;
   }
 
-  void use(long size) {
+  public void use(long size) {
     long got = taskMemoryManager.acquireExecutionMemory(size, this);
     used += got;
   }
 
-  void free(long size) {
+  public void free(long size) {
     used -= size;
     taskMemoryManager.releaseExecutionMemory(size, this);
   }
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 53a233f..278d28f 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -33,6 +33,8 @@ import org.mockito.MockitoAnnotations;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.memory.TestMemoryConsumer;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.memory.TestMemoryManager;
 import org.apache.spark.network.util.JavaUtils;
@@ -667,4 +669,49 @@ public abstract class AbstractBytesToBytesMapSuite {
     }
   }
 
+  @Test
+  public void avoidDeadlock() throws InterruptedException {
+    memoryManager.limit(PAGE_SIZE_BYTES);
+    MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: MemoryMode.ON_HEAP;
+    TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, mode);
+    BytesToBytesMap map =
+      new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024, false);
+
+    Thread thread = new Thread(() -> {
+      int i = 0;
+      long used = 0;
+      while (i < 10) {
+        c1.use(10000000);
+        used += 10000000;
+        i++;
+      }
+      c1.free(used);
+    });
+
+    try {
+      int i;
+      for (i = 0; i < 1024; i++) {
+        final long[] arr = new long[]{i};
+        final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
+        loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+      }
+
+      // Starts to require memory at another memory consumer.
+      thread.start();
+
+      BytesToBytesMap.MapIterator iter = map.destructiveIterator();
+      for (i = 0; i < 1024; i++) {
+        iter.next();
+      }
+      assertFalse(iter.hasNext());
+    } finally {
+      map.free();
+      thread.join();
+      for (File spillFile : spillFilesCreated) {
+        assertFalse("Spill file " + spillFile.getPath() + " was not cleaned up",
+          spillFile.exists());
+      }
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org