You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ad...@apache.org on 2014/09/29 21:38:33 UTC

git commit: Add more debug message for ManagedBuffer

Repository: spark
Updated Branches:
  refs/heads/master dab1b0ae2 -> e43c72fe0


Add more debug message for ManagedBuffer

This is to help debug the error reported at http://apache-spark-user-list.1001560.n3.nabble.com/SQL-queries-fail-in-1-2-0-SNAPSHOT-td15327.html

Author: Reynold Xin <rx...@apache.org>

Closes #2580 from rxin/buffer-debug and squashes the following commits:

5814292 [Reynold Xin] Logging close() in case close() fails.
323dfec [Reynold Xin] Add more debug message.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e43c72fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e43c72fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e43c72fe

Branch: refs/heads/master
Commit: e43c72fe04d4fbf2a108b456d533e641b71b0a2a
Parents: dab1b0a
Author: Reynold Xin <rx...@apache.org>
Authored: Mon Sep 29 12:38:24 2014 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Mon Sep 29 12:38:24 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/network/ManagedBuffer.scala    | 43 +++++++++++++++++---
 .../scala/org/apache/spark/util/Utils.scala     | 14 +++++++
 2 files changed, 51 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e43c72fe/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
index e990c1d..a440918 100644
--- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
@@ -17,15 +17,17 @@
 
 package org.apache.spark.network
 
-import java.io.{FileInputStream, RandomAccessFile, File, InputStream}
+import java.io._
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
 import java.nio.channels.FileChannel.MapMode
 
+import scala.util.Try
+
 import com.google.common.io.ByteStreams
 import io.netty.buffer.{ByteBufInputStream, ByteBuf}
 
-import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.{ByteBufferInputStream, Utils}
 
 
 /**
@@ -71,18 +73,47 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
     try {
       channel = new RandomAccessFile(file, "r").getChannel
       channel.map(MapMode.READ_ONLY, offset, length)
+    } catch {
+      case e: IOException =>
+        Try(channel.size).toOption match {
+          case Some(fileLen) =>
+            throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
+          case None =>
+            throw new IOException(s"Error in opening $this", e)
+        }
     } finally {
       if (channel != null) {
-        channel.close()
+        Utils.tryLog(channel.close())
       }
     }
   }
 
   override def inputStream(): InputStream = {
-    val is = new FileInputStream(file)
-    is.skip(offset)
-    ByteStreams.limit(is, length)
+    var is: FileInputStream = null
+    try {
+      is = new FileInputStream(file)
+      is.skip(offset)
+      ByteStreams.limit(is, length)
+    } catch {
+      case e: IOException =>
+        if (is != null) {
+          Utils.tryLog(is.close())
+        }
+        Try(file.length).toOption match {
+          case Some(fileLen) =>
+            throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
+          case None =>
+            throw new IOException(s"Error in opening $this", e)
+        }
+      case e: Throwable =>
+        if (is != null) {
+          Utils.tryLog(is.close())
+        }
+        throw e
+    }
   }
+
+  override def toString: String = s"${getClass.getName}($file, $offset, $length)"
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e43c72fe/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2755887..10d4408 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1304,6 +1304,20 @@ private[spark] object Utils extends Logging {
     }
   }
 
+  /** Executes the given block in a Try, logging any uncaught exceptions. */
+  def tryLog[T](f: => T): Try[T] = {
+    try {
+      val res = f
+      scala.util.Success(res)
+    } catch {
+      case ct: ControlThrowable =>
+        throw ct
+      case t: Throwable =>
+        logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
+        scala.util.Failure(t)
+    }
+  }
+
   /** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */
   def isFatalError(e: Throwable): Boolean = {
     e match {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org