You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/04/28 08:48:44 UTC

spark git commit: [SPARK-5946] [STREAMING] Add Python API for direct Kafka stream

Repository: spark
Updated Branches:
  refs/heads/master 29576e786 -> 9e4e82b7b


[SPARK-5946] [STREAMING] Add Python API for direct Kafka stream

Currently only added `createDirectStream` API, I'm not sure if `createRDD` is also needed, since some Java object needs to be wrapped in Python. Please help to review, thanks a lot.

Author: jerryshao <sa...@intel.com>
Author: Saisai Shao <sa...@intel.com>

Closes #4723 from jerryshao/direct-kafka-python-api and squashes the following commits:

a1fe97c [jerryshao] Fix rebase issue
eebf333 [jerryshao] Address the comments
da40f4e [jerryshao] Fix Python 2.6 Syntax error issue
5c0ee85 [jerryshao] Style fix
4aeac18 [jerryshao] Fix bug in example code
7146d86 [jerryshao] Add unit test
bf3bdd6 [jerryshao] Add more APIs and address the comments
f5b3801 [jerryshao] Small style fix
8641835 [Saisai Shao] Rebase and update the code
589c05b [Saisai Shao] Fix the style
d6fcb6a [Saisai Shao] Address the comments
dfda902 [Saisai Shao] Style fix
0f7d168 [Saisai Shao] Add the doc and fix some style issues
67e6880 [Saisai Shao] Fix test bug
917b0db [Saisai Shao] Add Python createRDD API for Kakfa direct stream
c3fc11d [jerryshao] Modify the docs
2c00936 [Saisai Shao] address the comments
3360f44 [jerryshao] Fix code style
e0e0f0d [jerryshao] Code clean and bug fix
338c41f [Saisai Shao] Add python API and example for direct kafka stream


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e4e82b7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e4e82b7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e4e82b7

Branch: refs/heads/master
Commit: 9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da
Parents: 29576e7
Author: jerryshao <sa...@intel.com>
Authored: Mon Apr 27 23:48:02 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Apr 27 23:48:02 2015 -0700

----------------------------------------------------------------------
 .../python/streaming/direct_kafka_wordcount.py  |  55 ++++++
 .../spark/streaming/kafka/KafkaUtils.scala      |  92 +++++++++-
 python/pyspark/streaming/kafka.py               | 167 ++++++++++++++++++-
 python/pyspark/streaming/tests.py               |  84 +++++++++-
 4 files changed, 383 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9e4e82b7/examples/src/main/python/streaming/direct_kafka_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/direct_kafka_wordcount.py b/examples/src/main/python/streaming/direct_kafka_wordcount.py
new file mode 100644
index 0000000..6ef188a
--- /dev/null
+++ b/examples/src/main/python/streaming/direct_kafka_wordcount.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in UTF8 encoded, '\n' delimited text directly received from Kafka in every 2 seconds.
+ Usage: direct_kafka_wordcount.py <broker_list> <topic>
+
+ To run this on your local machine, you need to setup Kafka and create a producer first, see
+ http://kafka.apache.org/documentation.html#quickstart
+
+ and then run the example
+    `$ bin/spark-submit --jars external/kafka-assembly/target/scala-*/\
+      spark-streaming-kafka-assembly-*.jar \
+      examples/src/main/python/streaming/direct_kafka_wordcount.py \
+      localhost:9092 test`
+"""
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+from pyspark.streaming.kafka import KafkaUtils
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        print >> sys.stderr, "Usage: direct_kafka_wordcount.py <broker_list> <topic>"
+        exit(-1)
+
+    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
+    ssc = StreamingContext(sc, 2)
+
+    brokers, topic = sys.argv[1:]
+    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
+    lines = kvs.map(lambda x: x[1])
+    counts = lines.flatMap(lambda line: line.split(" ")) \
+        .map(lambda word: (word, 1)) \
+        .reduceByKey(lambda a, b: a+b)
+    counts.pprint()
+
+    ssc.start()
+    ssc.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/9e4e82b7/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 5a9bd42..0721dda 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -21,6 +21,7 @@ import java.lang.{Integer => JInt}
 import java.lang.{Long => JLong}
 import java.util.{Map => JMap}
 import java.util.{Set => JSet}
+import java.util.{List => JList}
 
 import scala.reflect.ClassTag
 import scala.collection.JavaConversions._
@@ -234,7 +235,6 @@ object KafkaUtils {
     new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
   }
 
-
   /**
    * Create a RDD from Kafka using offset ranges for each topic and partition.
    *
@@ -558,4 +558,94 @@ private class KafkaUtilsPythonHelper {
       topics,
       storageLevel)
   }
+
+  def createRDD(
+      jsc: JavaSparkContext,
+      kafkaParams: JMap[String, String],
+      offsetRanges: JList[OffsetRange],
+      leaders: JMap[TopicAndPartition, Broker]): JavaPairRDD[Array[Byte], Array[Byte]] = {
+    val messageHandler = new JFunction[MessageAndMetadata[Array[Byte], Array[Byte]],
+      (Array[Byte], Array[Byte])] {
+      def call(t1: MessageAndMetadata[Array[Byte], Array[Byte]]): (Array[Byte], Array[Byte]) =
+        (t1.key(), t1.message())
+    }
+
+    val jrdd = KafkaUtils.createRDD[
+      Array[Byte],
+      Array[Byte],
+      DefaultDecoder,
+      DefaultDecoder,
+      (Array[Byte], Array[Byte])](
+        jsc,
+        classOf[Array[Byte]],
+        classOf[Array[Byte]],
+        classOf[DefaultDecoder],
+        classOf[DefaultDecoder],
+        classOf[(Array[Byte], Array[Byte])],
+        kafkaParams,
+        offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())),
+        leaders,
+        messageHandler
+      )
+    new JavaPairRDD(jrdd.rdd)
+  }
+
+  def createDirectStream(
+      jssc: JavaStreamingContext,
+      kafkaParams: JMap[String, String],
+      topics: JSet[String],
+      fromOffsets: JMap[TopicAndPartition, JLong]
+    ): JavaPairInputDStream[Array[Byte], Array[Byte]] = {
+
+    if (!fromOffsets.isEmpty) {
+      import scala.collection.JavaConversions._
+      val topicsFromOffsets = fromOffsets.keySet().map(_.topic)
+      if (topicsFromOffsets != topics.toSet) {
+        throw new IllegalStateException(s"The specified topics: ${topics.toSet.mkString(" ")} " +
+          s"do not equal to the topic from offsets: ${topicsFromOffsets.mkString(" ")}")
+      }
+    }
+
+    if (fromOffsets.isEmpty) {
+      KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
+        jssc,
+        classOf[Array[Byte]],
+        classOf[Array[Byte]],
+        classOf[DefaultDecoder],
+        classOf[DefaultDecoder],
+        kafkaParams,
+        topics)
+    } else {
+      val messageHandler = new JFunction[MessageAndMetadata[Array[Byte], Array[Byte]],
+        (Array[Byte], Array[Byte])] {
+        def call(t1: MessageAndMetadata[Array[Byte], Array[Byte]]): (Array[Byte], Array[Byte]) =
+          (t1.key(), t1.message())
+      }
+
+      val jstream = KafkaUtils.createDirectStream[
+        Array[Byte],
+        Array[Byte],
+        DefaultDecoder,
+        DefaultDecoder,
+        (Array[Byte], Array[Byte])](
+          jssc,
+          classOf[Array[Byte]],
+          classOf[Array[Byte]],
+          classOf[DefaultDecoder],
+          classOf[DefaultDecoder],
+          classOf[(Array[Byte], Array[Byte])],
+          kafkaParams,
+          fromOffsets,
+          messageHandler)
+      new JavaPairInputDStream(jstream.inputDStream)
+    }
+  }
+
+  def createOffsetRange(topic: String, partition: JInt, fromOffset: JLong, untilOffset: JLong
+    ): OffsetRange = OffsetRange.create(topic, partition, fromOffset, untilOffset)
+
+  def createTopicAndPartition(topic: String, partition: JInt): TopicAndPartition =
+    TopicAndPartition(topic, partition)
+
+  def createBroker(host: String, port: JInt): Broker = Broker(host, port)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e4e82b7/python/pyspark/streaming/kafka.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index 8d610d6..e278b29 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -17,11 +17,12 @@
 
 from py4j.java_gateway import Py4JJavaError
 
+from pyspark.rdd import RDD
 from pyspark.storagelevel import StorageLevel
 from pyspark.serializers import PairDeserializer, NoOpSerializer
 from pyspark.streaming import DStream
 
-__all__ = ['KafkaUtils', 'utf8_decoder']
+__all__ = ['Broker', 'KafkaUtils', 'OffsetRange', 'TopicAndPartition', 'utf8_decoder']
 
 
 def utf8_decoder(s):
@@ -67,7 +68,104 @@ class KafkaUtils(object):
         except Py4JJavaError as e:
             # TODO: use --jar once it also work on driver
             if 'ClassNotFoundException' in str(e.java_exception):
-                print("""
+                KafkaUtils._printErrorMsg(ssc.sparkContext)
+            raise e
+        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+        stream = DStream(jstream, ssc, ser)
+        return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
+
+    @staticmethod
+    def createDirectStream(ssc, topics, kafkaParams, fromOffsets={},
+                           keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
+        """
+        .. note:: Experimental
+
+        Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
+
+        This is not a receiver based Kafka input stream, it directly pulls the message from Kafka
+        in each batch duration and processed without storing.
+
+        This does not use Zookeeper to store offsets. The consumed offsets are tracked
+        by the stream itself. For interoperability with Kafka monitoring tools that depend on
+        Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+        You can access the offsets used in each batch from the generated RDDs (see
+
+        To recover from driver failures, you have to enable checkpointing in the StreamingContext.
+        The information on consumed offset can be recovered from the checkpoint.
+        See the programming guide for details (constraints, etc.).
+
+        :param ssc:  StreamingContext object.
+        :param topics:  list of topic_name to consume.
+        :param kafkaParams: Additional params for Kafka.
+        :param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting
+                            point of the stream.
+        :param keyDecoder:  A function used to decode key (default is utf8_decoder).
+        :param valueDecoder:  A function used to decode value (default is utf8_decoder).
+        :return: A DStream object
+        """
+        if not isinstance(topics, list):
+            raise TypeError("topics should be list")
+        if not isinstance(kafkaParams, dict):
+            raise TypeError("kafkaParams should be dict")
+
+        try:
+            helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+                .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
+            helper = helperClass.newInstance()
+
+            jfromOffsets = dict([(k._jTopicAndPartition(helper),
+                                  v) for (k, v) in fromOffsets.items()])
+            jstream = helper.createDirectStream(ssc._jssc, kafkaParams, set(topics), jfromOffsets)
+        except Py4JJavaError as e:
+            if 'ClassNotFoundException' in str(e.java_exception):
+                KafkaUtils._printErrorMsg(ssc.sparkContext)
+            raise e
+
+        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+        stream = DStream(jstream, ssc, ser)
+        return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
+
+    @staticmethod
+    def createRDD(sc, kafkaParams, offsetRanges, leaders={},
+                  keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
+        """
+        .. note:: Experimental
+
+        Create a RDD from Kafka using offset ranges for each topic and partition.
+        :param sc:  SparkContext object
+        :param kafkaParams: Additional params for Kafka
+        :param offsetRanges:  list of offsetRange to specify topic:partition:[start, end) to consume
+        :param leaders: Kafka brokers for each TopicAndPartition in offsetRanges.  May be an empty
+                        map, in which case leaders will be looked up on the driver.
+        :param keyDecoder:  A function used to decode key (default is utf8_decoder)
+        :param valueDecoder:  A function used to decode value (default is utf8_decoder)
+        :return: A RDD object
+        """
+        if not isinstance(kafkaParams, dict):
+            raise TypeError("kafkaParams should be dict")
+        if not isinstance(offsetRanges, list):
+            raise TypeError("offsetRanges should be list")
+
+        try:
+            helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+                .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
+            helper = helperClass.newInstance()
+            joffsetRanges = [o._jOffsetRange(helper) for o in offsetRanges]
+            jleaders = dict([(k._jTopicAndPartition(helper),
+                              v._jBroker(helper)) for (k, v) in leaders.items()])
+            jrdd = helper.createRDD(sc._jsc, kafkaParams, joffsetRanges, jleaders)
+        except Py4JJavaError as e:
+            if 'ClassNotFoundException' in str(e.java_exception):
+                KafkaUtils._printErrorMsg(sc)
+            raise e
+
+        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+        rdd = RDD(jrdd, sc, ser)
+        return rdd.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
+
+    @staticmethod
+    def _printErrorMsg(sc):
+        print("""
 ________________________________________________________________________________________________
 
   Spark Streaming's Kafka libraries not found in class path. Try one of the following.
@@ -85,8 +183,63 @@ ________________________________________________________________________________
 
 ________________________________________________________________________________________________
 
-""" % (ssc.sparkContext.version, ssc.sparkContext.version))
-            raise e
-        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
-        stream = DStream(jstream, ssc, ser)
-        return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
+""" % (sc.version, sc.version))
+
+
+class OffsetRange(object):
+    """
+    Represents a range of offsets from a single Kafka TopicAndPartition.
+    """
+
+    def __init__(self, topic, partition, fromOffset, untilOffset):
+        """
+        Create a OffsetRange to represent  range of offsets
+        :param topic: Kafka topic name.
+        :param partition: Kafka partition id.
+        :param fromOffset: Inclusive starting offset.
+        :param untilOffset: Exclusive ending offset.
+        """
+        self._topic = topic
+        self._partition = partition
+        self._fromOffset = fromOffset
+        self._untilOffset = untilOffset
+
+    def _jOffsetRange(self, helper):
+        return helper.createOffsetRange(self._topic, self._partition, self._fromOffset,
+                                        self._untilOffset)
+
+
+class TopicAndPartition(object):
+    """
+    Represents a specific top and partition for Kafka.
+    """
+
+    def __init__(self, topic, partition):
+        """
+        Create a Python TopicAndPartition to map to the Java related object
+        :param topic: Kafka topic name.
+        :param partition: Kafka partition id.
+        """
+        self._topic = topic
+        self._partition = partition
+
+    def _jTopicAndPartition(self, helper):
+        return helper.createTopicAndPartition(self._topic, self._partition)
+
+
+class Broker(object):
+    """
+    Represent the host and port info for a Kafka broker.
+    """
+
+    def __init__(self, host, port):
+        """
+        Create a Python Broker to map to the Java related object.
+        :param host: Broker's hostname.
+        :param port: Broker's port.
+        """
+        self._host = host
+        self._port = port
+
+    def _jBroker(self, helper):
+        return helper.createBroker(self._host, self._port)

http://git-wip-us.apache.org/repos/asf/spark/blob/9e4e82b7/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 5fa1e5e..7c06c20 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -21,6 +21,7 @@ from itertools import chain
 import time
 import operator
 import tempfile
+import random
 import struct
 from functools import reduce
 
@@ -35,7 +36,7 @@ else:
 
 from pyspark.context import SparkConf, SparkContext, RDD
 from pyspark.streaming.context import StreamingContext
-from pyspark.streaming.kafka import KafkaUtils
+from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
 
 
 class PySparkStreamingTestCase(unittest.TestCase):
@@ -590,9 +591,27 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 
         super(KafkaStreamTests, self).tearDown()
 
+    def _randomTopic(self):
+        return "topic-%d" % random.randint(0, 10000)
+
+    def _validateStreamResult(self, sendData, stream):
+        result = {}
+        for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
+                                                   sum(sendData.values()))):
+            result[i] = result.get(i, 0) + 1
+
+        self.assertEqual(sendData, result)
+
+    def _validateRddResult(self, sendData, rdd):
+        result = {}
+        for i in rdd.map(lambda x: x[1]).collect():
+            result[i] = result.get(i, 0) + 1
+
+        self.assertEqual(sendData, result)
+
     def test_kafka_stream(self):
         """Test the Python Kafka stream API."""
-        topic = "topic1"
+        topic = self._randomTopic()
         sendData = {"a": 3, "b": 5, "c": 10}
 
         self._kafkaTestUtils.createTopic(topic)
@@ -601,13 +620,64 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
                                          "test-streaming-consumer", {topic: 1},
                                          {"auto.offset.reset": "smallest"})
+        self._validateStreamResult(sendData, stream)
 
-        result = {}
-        for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
-                                                   sum(sendData.values()))):
-            result[i] = result.get(i, 0) + 1
+    def test_kafka_direct_stream(self):
+        """Test the Python direct Kafka stream API."""
+        topic = self._randomTopic()
+        sendData = {"a": 1, "b": 2, "c": 3}
+        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
+                       "auto.offset.reset": "smallest"}
 
-        self.assertEqual(sendData, result)
+        self._kafkaTestUtils.createTopic(topic)
+        self._kafkaTestUtils.sendMessages(topic, sendData)
+
+        stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
+        self._validateStreamResult(sendData, stream)
+
+    @unittest.skipIf(sys.version >= "3", "long type not support")
+    def test_kafka_direct_stream_from_offset(self):
+        """Test the Python direct Kafka stream API with start offset specified."""
+        topic = self._randomTopic()
+        sendData = {"a": 1, "b": 2, "c": 3}
+        fromOffsets = {TopicAndPartition(topic, 0): long(0)}
+        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+
+        self._kafkaTestUtils.createTopic(topic)
+        self._kafkaTestUtils.sendMessages(topic, sendData)
+
+        stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
+        self._validateStreamResult(sendData, stream)
+
+    @unittest.skipIf(sys.version >= "3", "long type not support")
+    def test_kafka_rdd(self):
+        """Test the Python direct Kafka RDD API."""
+        topic = self._randomTopic()
+        sendData = {"a": 1, "b": 2}
+        offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
+        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+
+        self._kafkaTestUtils.createTopic(topic)
+        self._kafkaTestUtils.sendMessages(topic, sendData)
+
+        rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
+        self._validateRddResult(sendData, rdd)
+
+    @unittest.skipIf(sys.version >= "3", "long type not support")
+    def test_kafka_rdd_with_leaders(self):
+        """Test the Python direct Kafka RDD API with leaders."""
+        topic = self._randomTopic()
+        sendData = {"a": 1, "b": 2, "c": 3}
+        offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
+        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+        address = self._kafkaTestUtils.brokerAddress().split(":")
+        leaders = {TopicAndPartition(topic, 0): Broker(address[0], int(address[1]))}
+
+        self._kafkaTestUtils.createTopic(topic)
+        self._kafkaTestUtils.sendMessages(topic, sendData)
+
+        rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
+        self._validateRddResult(sendData, rdd)
 
 if __name__ == "__main__":
     unittest.main()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org