You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/02/26 22:47:11 UTC

spark git commit: [SPARK-6027][SPARK-5546] Fixed --jar and --packages not working for KafkaUtils and improved error message

Repository: spark
Updated Branches:
  refs/heads/master 8942b522d -> aa63f633d


[SPARK-6027][SPARK-5546] Fixed --jar and --packages not working for KafkaUtils and improved error message

The problem with SPARK-6027 in short is that JARs like the kafka-assembly.jar does not work in python as the added JAR is not visible in the classloader used by Py4J. Py4J uses Class.forName(), which does not uses the systemclassloader, but the JARs are only visible in the Thread's contextclassloader. So this back uses the context class loader to create the KafkaUtils dstream object. This works for both cases where the Kafka libraries are added with --jars spark-streaming-kafka-assembly.jar or with --packages spark-streaming-kafka

Also improves the error message.

davies

Author: Tathagata Das <ta...@gmail.com>

Closes #4779 from tdas/kafka-python-fix and squashes the following commits:

fb16b04 [Tathagata Das] Removed import
c1fdf35 [Tathagata Das] Fixed long line and improved documentation
7b88be8 [Tathagata Das] Fixed --jar not working for KafkaUtils and improved error message


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa63f633
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa63f633
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa63f633

Branch: refs/heads/master
Commit: aa63f633d39efa8c29095295f161eaad5495071d
Parents: 8942b52
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Feb 26 13:46:07 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Feb 26 13:47:07 2015 -0800

----------------------------------------------------------------------
 .../spark/streaming/kafka/KafkaUtils.scala      | 29 +++++++++++++-
 python/pyspark/streaming/kafka.py               | 42 +++++++++++++-------
 2 files changed, 55 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aa63f633/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index af04bc6..62a6595 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -27,7 +27,7 @@ import scala.collection.JavaConversions._
 
 import kafka.common.TopicAndPartition
 import kafka.message.MessageAndMetadata
-import kafka.serializer.{Decoder, StringDecoder}
+import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}
 
 import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.{SparkContext, SparkException}
@@ -532,3 +532,30 @@ object KafkaUtils {
     )
   }
 }
+
+/**
+ * This is a helper class that wraps the KafkaUtils.createStream() into more
+ * Python-friendly class and function so that it can be easily
+ * instantiated and called from Python's KafkaUtils (see SPARK-6027).
+ *
+ * The zero-arg constructor helps instantiate this class from the Class object
+ * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
+ * takes care of known parameters instead of passing them from Python
+ */
+private class KafkaUtilsPythonHelper {
+  def createStream(
+      jssc: JavaStreamingContext,
+      kafkaParams: JMap[String, String],
+      topics: JMap[String, JInt],
+      storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = {
+    KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
+      jssc,
+      classOf[Array[Byte]],
+      classOf[Array[Byte]],
+      classOf[DefaultDecoder],
+      classOf[DefaultDecoder],
+      kafkaParams,
+      topics,
+      storageLevel)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/aa63f633/python/pyspark/streaming/kafka.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index 19ad71f..0002dc1 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -16,7 +16,7 @@
 #
 
 from py4j.java_collections import MapConverter
-from py4j.java_gateway import java_import, Py4JError
+from py4j.java_gateway import java_import, Py4JError, Py4JJavaError
 
 from pyspark.storagelevel import StorageLevel
 from pyspark.serializers import PairDeserializer, NoOpSerializer
@@ -50,8 +50,6 @@ class KafkaUtils(object):
         :param valueDecoder:  A function used to decode value (default is utf8_decoder)
         :return: A DStream object
         """
-        java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils")
-
         kafkaParams.update({
             "zookeeper.connect": zkQuorum,
             "group.id": groupId,
@@ -63,20 +61,34 @@ class KafkaUtils(object):
         jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client)
         jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
 
-        def getClassByName(name):
-            return ssc._jvm.org.apache.spark.util.Utils.classForName(name)
-
         try:
-            array = getClassByName("[B")
-            decoder = getClassByName("kafka.serializer.DefaultDecoder")
-            jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder,
-                                                       jparam, jtopics, jlevel)
-        except Py4JError, e:
+            # Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
+            helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
+                .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
+            helper = helperClass.newInstance()
+            jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
+        except Py4JJavaError, e:
             # TODO: use --jar once it also work on driver
-            if not e.message or 'call a package' in e.message:
-                print "No kafka package, please put the assembly jar into classpath:"
-                print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \
-                      "scala-*/spark-streaming-kafka-assembly-*.jar"
+            if 'ClassNotFoundException' in str(e.java_exception):
+                print """
+________________________________________________________________________________________________
+
+  Spark Streaming's Kafka libraries not found in class path. Try one of the following.
+
+  1. Include the Kafka library and its dependencies with in the
+     spark-submit command as
+
+     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s ...
+
+  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+     Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s.
+     Then, innclude the jar in the spark-submit command as
+
+     $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...
+
+________________________________________________________________________________________________
+
+""" % (ssc.sparkContext.version, ssc.sparkContext.version)
             raise e
         ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
         stream = DStream(jstream, ssc, ser)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org