You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ad...@apache.org on 2014/09/29 21:38:33 UTC
git commit: Add more debug message for ManagedBuffer
Repository: spark
Updated Branches:
refs/heads/master dab1b0ae2 -> e43c72fe0
Add more debug message for ManagedBuffer
This is to help debug the error reported at http://apache-spark-user-list.1001560.n3.nabble.com/SQL-queries-fail-in-1-2-0-SNAPSHOT-td15327.html
Author: Reynold Xin <rx...@apache.org>
Closes #2580 from rxin/buffer-debug and squashes the following commits:
5814292 [Reynold Xin] Logging close() in case close() fails.
323dfec [Reynold Xin] Add more debug message.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e43c72fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e43c72fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e43c72fe
Branch: refs/heads/master
Commit: e43c72fe04d4fbf2a108b456d533e641b71b0a2a
Parents: dab1b0a
Author: Reynold Xin <rx...@apache.org>
Authored: Mon Sep 29 12:38:24 2014 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Mon Sep 29 12:38:24 2014 -0700
----------------------------------------------------------------------
.../apache/spark/network/ManagedBuffer.scala | 43 +++++++++++++++++---
.../scala/org/apache/spark/util/Utils.scala | 14 +++++++
2 files changed, 51 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e43c72fe/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
index e990c1d..a440918 100644
--- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
@@ -17,15 +17,17 @@
package org.apache.spark.network
-import java.io.{FileInputStream, RandomAccessFile, File, InputStream}
+import java.io._
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.channels.FileChannel.MapMode
+import scala.util.Try
+
import com.google.common.io.ByteStreams
import io.netty.buffer.{ByteBufInputStream, ByteBuf}
-import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.{ByteBufferInputStream, Utils}
/**
@@ -71,18 +73,47 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
try {
channel = new RandomAccessFile(file, "r").getChannel
channel.map(MapMode.READ_ONLY, offset, length)
+ } catch {
+ case e: IOException =>
+ Try(channel.size).toOption match {
+ case Some(fileLen) =>
+ throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
+ case None =>
+ throw new IOException(s"Error in opening $this", e)
+ }
} finally {
if (channel != null) {
- channel.close()
+ Utils.tryLog(channel.close())
}
}
}
override def inputStream(): InputStream = {
- val is = new FileInputStream(file)
- is.skip(offset)
- ByteStreams.limit(is, length)
+ var is: FileInputStream = null
+ try {
+ is = new FileInputStream(file)
+ is.skip(offset)
+ ByteStreams.limit(is, length)
+ } catch {
+ case e: IOException =>
+ if (is != null) {
+ Utils.tryLog(is.close())
+ }
+ Try(file.length).toOption match {
+ case Some(fileLen) =>
+ throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
+ case None =>
+ throw new IOException(s"Error in opening $this", e)
+ }
+ case e: Throwable =>
+ if (is != null) {
+ Utils.tryLog(is.close())
+ }
+ throw e
+ }
}
+
+ override def toString: String = s"${getClass.getName}($file, $offset, $length)"
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e43c72fe/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2755887..10d4408 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1304,6 +1304,20 @@ private[spark] object Utils extends Logging {
}
}
+ /** Executes the given block in a Try, logging any uncaught exceptions. */
+ def tryLog[T](f: => T): Try[T] = {
+ try {
+ val res = f
+ scala.util.Success(res)
+ } catch {
+ case ct: ControlThrowable =>
+ throw ct
+ case t: Throwable =>
+ logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
+ scala.util.Failure(t)
+ }
+ }
+
/** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */
def isFatalError(e: Throwable): Boolean = {
e match {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org