You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/12/22 22:07:05 UTC

spark git commit: [SPARK-4079] [CORE] Consolidates Errors if a CompressionCodec is not available

Repository: spark
Updated Branches:
  refs/heads/master d62da642a -> 7c0ed13d2


[SPARK-4079] [CORE] Consolidates Errors if a CompressionCodec is not available

This commit consolidates some of the exceptions thrown if compression codecs are not available. If a bad configuration string was passed in, a ClassNotFoundException was through. Also, if Snappy was not available, it would throw an InvocationTargetException when the codec was being used (not when it was being initialized). Now, an IllegalArgumentException is thrown when a codec is not available at creation time - either because the class does not exist or the codec itself is not available in the system. This will allow us to have a better message and fail faster.

Author: Kostas Sakellis <ko...@cloudera.com>

Closes #3119 from ksakellis/kostas-spark-4079 and squashes the following commits:

9709c7c [Kostas Sakellis] Removed unnecessary Logging class
63bfdd0 [Kostas Sakellis] Removed isAvailable to preserve binary compatibility
1d0ef2f [Kostas Sakellis] [SPARK-4079] [CORE] Added more information to exception
64f3d27 [Kostas Sakellis] [SPARK-4079] [CORE] Code review feedback
52dfa8f [Kostas Sakellis] [SPARK-4079] [CORE] Default to LZF if Snappy not available


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c0ed13d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c0ed13d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c0ed13d

Branch: refs/heads/master
Commit: 7c0ed13d298d9cf66842c667602e2dccb8f5605b
Parents: d62da64
Author: Kostas Sakellis <ko...@cloudera.com>
Authored: Mon Dec 22 13:07:01 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Dec 22 13:07:01 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/io/CompressionCodec.scala  | 27 +++++++++++++++-----
 .../apache/spark/io/CompressionCodecSuite.scala |  6 +++++
 2 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c0ed13d/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 1ac7f4e..f856890 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -21,11 +21,12 @@ import java.io.{InputStream, OutputStream}
 
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
-import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.Utils
+import org.apache.spark.Logging
 
 /**
  * :: DeveloperApi ::
@@ -44,25 +45,33 @@ trait CompressionCodec {
   def compressedInputStream(s: InputStream): InputStream
 }
 
-
 private[spark] object CompressionCodec {
 
+  private val configKey = "spark.io.compression.codec"
   private val shortCompressionCodecNames = Map(
     "lz4" -> classOf[LZ4CompressionCodec].getName,
     "lzf" -> classOf[LZFCompressionCodec].getName,
     "snappy" -> classOf[SnappyCompressionCodec].getName)
 
   def createCodec(conf: SparkConf): CompressionCodec = {
-    createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC))
+    createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC))
   }
 
   def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
     val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
-    val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
-      .getConstructor(classOf[SparkConf])
-    ctor.newInstance(conf).asInstanceOf[CompressionCodec]
+    val codec = try {
+      val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
+        .getConstructor(classOf[SparkConf])
+      Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
+    } catch {
+      case e: ClassNotFoundException => None
+      case e: IllegalArgumentException => None
+    }
+    codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " +
+      s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
   }
 
+  val FALLBACK_COMPRESSION_CODEC = "lzf"
   val DEFAULT_COMPRESSION_CODEC = "snappy"
   val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
 }
@@ -120,6 +129,12 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
 @DeveloperApi
 class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
 
+  try {
+    Snappy.getNativeLibraryVersion
+  } catch {
+    case e: Error => throw new IllegalArgumentException
+  }
+
   override def compressedOutputStream(s: OutputStream): OutputStream = {
     val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768)
     new SnappyOutputStream(s, blockSize)

http://git-wip-us.apache.org/repos/asf/spark/blob/7c0ed13d/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index 25be7f2..8c6035f 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -85,4 +85,10 @@ class CompressionCodecSuite extends FunSuite {
     assert(codec.getClass === classOf[SnappyCompressionCodec])
     testCodec(codec)
   }
+
+  test("bad compression codec") {
+    intercept[IllegalArgumentException] {
+      CompressionCodec.createCodec(conf, "foobar")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org