You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/02 07:16:15 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-238] Support TubeMQ connector for Apache Spark Streaming (#176)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 26262a5  [TUBEMQ-238] Support TubeMQ connector for Apache Spark Streaming (#176)
26262a5 is described below

commit 26262a5678d846d1f108cc90125525c84fe246bd
Author: imarch1 <im...@gmail.com>
AuthorDate: Thu Jul 2 15:16:06 2020 +0800

    [TUBEMQ-238] Support TubeMQ connector for Apache Spark Streaming (#176)
    
    Co-authored-by: shawnqzhang <sh...@tencent.com>
---
 tubemq-connectors/pom.xml                          |   1 +
 .../{ => tubemq-connector-spark}/pom.xml           |  59 ++++----
 .../tubemq/connector/spark/ConsumerConf.scala      | 159 +++++++++++++++++++++
 .../connector/spark/JavaTubeMQProvider.scala       |  53 +++++++
 .../tubemq/connector/spark/ProducerConf.scala      |  76 ++++++++++
 .../tubemq/connector/spark/TubeMQConsumer.scala    | 112 +++++++++++++++
 .../tubemq/connector/spark/TubeMQFunctions.scala   | 115 +++++++++++++++
 .../tubemq/connector/spark/TubeMQProducer.scala    | 119 +++++++++++++++
 .../tubemq/connector/spark/TubeMQProvider.scala    |  79 ++++++++++
 9 files changed, 747 insertions(+), 26 deletions(-)

diff --git a/tubemq-connectors/pom.xml b/tubemq-connectors/pom.xml
index d75d2be..dec8438 100644
--- a/tubemq-connectors/pom.xml
+++ b/tubemq-connectors/pom.xml
@@ -32,6 +32,7 @@
     <modules>
         <module>tubemq-connector-flink</module>
         <module>tubemq-connector-flume</module>
+        <module>tubemq-connector-spark</module>
     </modules>
 
     <dependencies>
diff --git a/tubemq-connectors/pom.xml b/tubemq-connectors/tubemq-connector-spark/pom.xml
similarity index 57%
copy from tubemq-connectors/pom.xml
copy to tubemq-connectors/tubemq-connector-spark/pom.xml
index d75d2be..a91cdc3 100644
--- a/tubemq-connectors/pom.xml
+++ b/tubemq-connectors/tubemq-connector-spark/pom.xml
@@ -20,19 +20,18 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
+        <artifactId>tubemq-connectors</artifactId>
         <groupId>org.apache.tubemq</groupId>
-        <artifactId>tubemq</artifactId>
         <version>0.5.0-incubating-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <name>Apache TubeMQ - Connectors</name>
-    <artifactId>tubemq-connectors</artifactId>
-    <packaging>pom</packaging>
-    <modules>
-        <module>tubemq-connector-flink</module>
-        <module>tubemq-connector-flume</module>
-    </modules>
+    <name>Apache TubeMQ - Connectors-spark</name>
+    <artifactId>tubemq-connector-spark</artifactId>
+
+    <properties>
+        <scala.binary.version>2.11</scala.binary.version>
+        <spark.version>2.4.4</spark.version>
+    </properties>
 
     <dependencies>
         <dependency>
@@ -44,22 +43,30 @@
             <groupId>org.apache.tubemq</groupId>
             <artifactId>tubemq-core</artifactId>
         </dependency>
-    </dependencies>
 
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>3.1</version>
-                <configuration>
-                    <source>1.8</source>
-                    <target>1.8</target>
-                    <encoding>UTF-8</encoding>
-                    <showDeprecation>true</showDeprecation>
-                    <showWarnings>true</showWarnings>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 </project>
\ No newline at end of file
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/ConsumerConf.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/ConsumerConf.scala
new file mode 100644
index 0000000..52cf9d3
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/ConsumerConf.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+import org.apache.spark.storage.StorageLevel
+
+import scala.collection.JavaConversions._
+
+abstract class ConsumerConf {
+  private var _group: String = _
+  def group: String = _group
+  def setGroup(value: String): this.type = {
+    _group = value
+    this
+  }
+
+  private var _topic: String = _
+  def topic: String = _topic
+  def setTopic(value: String): this.type = {
+    _topic = value
+    this
+  }
+
+  private var _consumeFromMaxOffset: Boolean = true
+  def consumeFromMaxOffset: Boolean = _consumeFromMaxOffset
+  def setConsumeFromMaxOffset(value: Boolean): this.type = {
+    _consumeFromMaxOffset = value
+    this
+  }
+
+  private var _storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK
+  def storageLevel: StorageLevel = _storageLevel
+  def setStorageLevel(value: StorageLevel): this.type = {
+    _storageLevel = value
+    this
+  }
+}
+
+class TubeMQConsumerConf extends ConsumerConf {
+  private var _master: String = _
+  def master: String = _master
+  def setMaster(value: String): this.type = {
+    _master = value
+    this
+  }
+
+  private var _filterAttrs:Array[String] = _
+  private var _filterAttrId:String = _
+  private[this] var _filterOnRemote: Boolean = false
+
+  def filterAttrId:String = _filterAttrId;
+  def filterAttrs:Array[String] = _filterAttrs;
+
+  def setFilterAttrs(attrId: String, value: Array[String]): this.type = {
+    require(filterAttrId == null,s"Has been specified $filterAttrId as message filter attribute id")
+    _filterAttrs = value
+    _filterAttrId = attrId
+    this
+  }
+
+  def setTids(value: Array[String]): this.type = {
+    setFilterAttrs("tid", value)
+  }
+
+  def setINames(value: Array[String]): this.type = {
+    setFilterAttrs("iname", value)
+  }
+
+  private var _includeAttrId:String = null;
+  def includeAttrId = _includeAttrId
+  def setIncludeAttrId(attrId:String): this.type ={
+    _includeAttrId = attrId
+    this
+  }
+
+  def setIncludeTid(value: Boolean): this.type = {
+    if(value){
+      setIncludeAttrId("tid")
+    }
+    this
+  }
+
+  def setIncludeIName(value: Boolean): this.type = {
+    if(value){
+      setIncludeAttrId("iname")
+    }
+    this
+  }
+
+  def filterOnRemote: Boolean = _filterOnRemote
+
+  def setFilterOnRemote(value: Boolean): this.type = {
+    _filterOnRemote = value
+    this
+  }
+
+  // for python api
+  def buildFrom(
+      master: String,
+      group: String,
+      topic: String,
+      tids: java.util.List[String],
+      consumeFromMaxOffset: Boolean,
+      includeTid: Boolean,
+      storageLevel: StorageLevel): this.type = {
+    buildFrom(master, group, topic, tids, "tid", if (includeTid) "tid" else null, consumeFromMaxOffset, storageLevel)
+  }
+
+  // for python api
+  def buildFrom(
+      master: String,
+      group: String,
+      topic: String,
+      filterAttrs: java.util.List[String],
+      filterAttrId: String,
+      includeAttrId: String,
+      consumeFromMaxOffset: Boolean,
+      storageLevel: StorageLevel): this.type = {
+    buildFrom(master, group, topic, filterAttrs, filterAttrId, false, includeAttrId, consumeFromMaxOffset, storageLevel)
+  }
+
+  def buildFrom(
+                 master: String,
+                 group: String,
+                 topic: String,
+                 filterAttrs: java.util.List[String],
+                 filterAttrId: String,
+                 filterOnRemote: Boolean,
+                 includeAttrId: String,
+                 consumeFromMaxOffset: Boolean,
+                 storageLevel: StorageLevel): this.type = {
+    _master = master
+    setFilterAttrs(filterAttrId, if (filterAttrs != null) filterAttrs.toList.toArray else null)
+    setFilterOnRemote(filterOnRemote)
+    setIncludeAttrId(includeAttrId)
+    setGroup(group)
+    setTopic(topic)
+    setConsumeFromMaxOffset(consumeFromMaxOffset)
+    setStorageLevel(storageLevel)
+    this
+  }
+}
+
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/JavaTubeMQProvider.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/JavaTubeMQProvider.scala
new file mode 100644
index 0000000..8ebfb6a
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/JavaTubeMQProvider.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+
+class JavaTubeMQProvider private(tubeProvider: TubeMQProvider) {
+  def this(jssc: JavaStreamingContext) = {
+    this(new TubeMQProvider(jssc.ssc))
+  }
+
+  def bytesStream(
+      config: ConsumerConf,
+      numReceiver: Int): JavaDStream[Array[Byte]] = {
+    tubeProvider.bytesStream(config, numReceiver)
+  }
+
+  def textStream(
+      config: ConsumerConf,
+      numReceiver: Int,
+      storageLevel: StorageLevel): JavaDStream[String] = {
+    tubeProvider.textStream(config, numReceiver)
+  }
+
+  def saveBytesStreamToTDBank(
+      dstream: JavaDStream[Array[Byte]],
+      config: ProducerConf): Unit = {
+    tubeProvider.saveBytesStreamToTDBank(dstream.dstream, config)
+  }
+
+  def saveTextStreamToTDBank(
+      dstream: JavaDStream[String],
+      config: ProducerConf): Unit = {
+    tubeProvider.saveTextStreamToTDBank(dstream.dstream, config)
+  }
+}
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/ProducerConf.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/ProducerConf.scala
new file mode 100644
index 0000000..7394f40
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/ProducerConf.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+// Hold sender common parameters
+abstract class ProducerConf extends Serializable {
+}
+
+class TubeMQProducerConf extends ProducerConf {
+  private var _topic: String = _
+  def topic: String = _topic
+  def setTopic(value: String): this.type = {
+    _topic = value
+    this
+  }
+
+  private var _masterHostAndPort: String = _
+  def masterHostAndPort: String = _masterHostAndPort
+  def setMasterHostAndPort(value: String): this.type = {
+    _masterHostAndPort = value
+    this
+  }
+
+  private var _timeout: Long = 20000 // 20s
+  def timeout: Long = _timeout
+  def setTimeout(value: Long): this.type = {
+    _timeout = value
+    this
+  }
+
+  private var _maxRetryTimes: Int = 3
+  def maxRetryTimes: Int = _maxRetryTimes
+  def setMaxRetryTimes(value: Int): this.type = {
+    _maxRetryTimes = value
+    this
+  }
+
+
+  private var _exitOnException: Boolean = true
+  def exitOnException: Boolean = _exitOnException
+  def setExitOnException(value: Boolean): this.type = {
+    _exitOnException = value
+    this
+  }
+
+  // for python api
+  def buildFrom(
+      topic: String,
+      masterHostAndPort: String,
+      timeout: Int,
+      maxRetryTimes: Int,
+      exitOnException: Boolean): this.type = {
+    _topic = topic
+    _masterHostAndPort = masterHostAndPort
+    _timeout = timeout
+    _maxRetryTimes = maxRetryTimes
+    _exitOnException = exitOnException
+    this
+  }
+}
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQConsumer.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQConsumer.scala
new file mode 100644
index 0000000..88881ce
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQConsumer.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+import java.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+
+import org.apache.tubemq.client.config.{ConsumerConfig, TubeClientConfig}
+import org.apache.tubemq.client.consumer.MessageListener
+import org.apache.tubemq.client.factory.{MessageSessionFactory, TubeSingleSessionFactory}
+import org.apache.tubemq.corebase.Message
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class TubeMQConsumer(
+    master: String,
+    group: String,
+    topic: String,
+    filterAttrs: Seq[String],
+    filterAttrId:String,
+    filterOnRemote:Boolean,
+    includeAttrId:String,
+    consumeFromMaxOffset: Boolean,
+    storageLevel: StorageLevel)
+  extends Receiver[Array[Byte]](storageLevel) {
+  self =>
+
+  def this(receiverConfig: TubeMQConsumerConf) = {
+    this(receiverConfig.master,
+      receiverConfig.group,
+      receiverConfig.topic,
+      receiverConfig.filterAttrs,
+      receiverConfig.filterAttrId,
+      receiverConfig.filterOnRemote,
+      receiverConfig.includeAttrId,
+      receiverConfig.consumeFromMaxOffset,
+      receiverConfig.storageLevel)
+  }
+
+  require(master != null, "'master' must be set.")
+  require(group != null, "'group' must be set.")
+  require(topic != null, "'topic' must be set.")
+
+
+
+  @transient
+  var messageSessionFactory: MessageSessionFactory = null
+
+  override def onStart(): Unit = {
+    val config = new TubeClientConfig(master)
+    messageSessionFactory = new TubeSingleSessionFactory(config)
+    val consumerConfig = new ConsumerConfig(master, group)
+    val consumeModel = if (consumeFromMaxOffset) 1 else 0
+    consumerConfig.setConsumeModel(consumeModel)
+    val consumer = messageSessionFactory.createPushConsumer(consumerConfig)
+    if (filterOnRemote && filterAttrs != null) {
+      consumer.subscribe(topic, new util.TreeSet[String](filterAttrs), new SimpleMessageListener(topic, filterAttrs,
+        filterAttrId, includeAttrId))
+    } else {
+      consumer.subscribe(topic, null, new SimpleMessageListener(topic, filterAttrs, filterAttrId, includeAttrId))
+    }
+    consumer.completeSubscribe()
+  }
+
+  override def onStop(): Unit = {
+    if (messageSessionFactory != null) {
+      messageSessionFactory.shutdown()
+      messageSessionFactory = null
+    }
+  }
+
+  class SimpleMessageListener(topic: String, filterAttrs: Seq[String], filterAttrId: String, includeAttrId: String)
+    extends MessageListener {
+
+    override def receiveMessages(messages: util.List[Message]): Unit = {
+      messages.asScala.foreach(msg => processMessage(msg))
+    }
+
+    def processMessage(message: Message) = {
+      if (!topic.equals(message.getTopic)) {
+        throw new SparkException(
+          s"topic error, the message topic is ${message.getTopic}")
+      }
+
+      // Deal with message.getData
+    }
+
+    override def getExecutor = null
+
+    override def stop() = {}
+  }
+}
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQFunctions.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQFunctions.scala
new file mode 100644
index 0000000..1b10706
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQFunctions.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkEnv
+import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.language.implicitConversions
+
+object TubeMQFunctions {
+
+  class ArrayByteRDDFunctins(rdd: RDD[Array[Byte]]) extends Serializable {
+    def saveToTube(config: ProducerConf): Unit = {
+      SparkEnv.get.conf.set("spark.task.maxFailures", "1")
+      SparkEnv.get.conf.set("spark.speculation", "false")
+      config match {
+        case conf: TubeMQProducerConf =>
+          val sender = new TubeMQProducer(conf)
+          val writer = (iter: Iterator[Array[Byte]]) => {
+            sender.start()
+            try {
+              while (iter.hasNext) {
+                val record = iter.next()
+                sender.send(record)
+              }
+            } finally {
+              sender.close()
+            }
+          }
+          rdd.context.runJob(rdd, writer)
+
+        case _ => throw new UnsupportedOperationException("Unknown SenderConfig")
+      }
+    }
+  }
+
+  class StringRDDFunctins(rdd: RDD[String]) extends Serializable {
+    def saveToTube(config: ProducerConf): Unit = {
+      rdd.map(_.getBytes("UTF-8")).saveToTube(config)
+    }
+  }
+
+  class ArrayByteDStreamFunctins(dstream: DStream[Array[Byte]]) extends Serializable {
+    def saveRDD(rdd: RDD[Array[Byte]], sender: TubeMQProducer): Unit = {
+      val writer = (iter: Iterator[Array[Byte]]) => {
+        try {
+          sender.start()
+          while (iter.hasNext) {
+            val record = iter.next()
+            sender.send(record)
+          }
+        }
+        finally {
+          sender.close()
+        }
+      }
+      rdd.context.runJob(rdd, writer)
+    }
+
+    def saveToTube(config: ProducerConf): Unit = {
+      SparkEnv.get.conf.set("spark.task.maxFailures", "1")
+      SparkEnv.get.conf.set("spark.speculation", "false")
+      val sender: TubeMQProducer = config match {
+        case conf: TubeMQProducerConf => new TubeMQProducer(conf)
+        case _ => throw new UnsupportedOperationException("Unknown SenderConfig")
+      }
+      val saveFunc = (rdd: RDD[Array[Byte]], time: Time) => {
+        saveRDD(rdd, sender)
+      }
+      dstream.foreachRDD(saveFunc)
+    }
+  }
+
+  class StringDStreamFunctins(dstream: DStream[String]) extends Serializable {
+    def saveToTube(config: ProducerConf): Unit = {
+      dstream.map(_.getBytes("UTF-8")).saveToTube(config)
+    }
+  }
+
+  implicit def rddToArrayByteRDDFunctions(rdd: RDD[Array[Byte]]): ArrayByteRDDFunctins = {
+    new ArrayByteRDDFunctins(rdd)
+  }
+
+  implicit def rddToStringRDDFunctions(rdd: RDD[String]): StringRDDFunctins = {
+    new StringRDDFunctins(rdd)
+  }
+
+  implicit def dStreamToArrayByteDStreamFunctions(
+      dstream: DStream[Array[Byte]]): ArrayByteDStreamFunctins = {
+    new ArrayByteDStreamFunctins(dstream)
+  }
+
+  implicit def dStreamToStringDStreamFunctions(
+      dstream: DStream[String]): StringDStreamFunctins = {
+    new StringDStreamFunctins(dstream)
+  }
+}
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQProducer.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQProducer.scala
new file mode 100644
index 0000000..6ad521f
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQProducer.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+import java.util.HashSet
+
+import org.apache.spark.SparkException
+
+import org.apache.tubemq.client.config.TubeClientConfig
+import org.apache.tubemq.client.factory.TubeSingleSessionFactory
+import org.apache.tubemq.client.producer.{MessageProducer, MessageSentCallback, MessageSentResult}
+import org.apache.tubemq.corebase.Message
+import org.slf4j.{Logger, LoggerFactory}
+
+private[spark] class TubeMQProducer(
+    topic: String,
+    masterHostAndPort: String,
+    maxRetryTimes: Int,
+    exitOnException: Boolean)
+  extends Serializable {
+
+  private val LOG: Logger = LoggerFactory.getLogger(classOf[TubeMQProducer])
+
+  private var producer: MessageProducer = _
+  private var sessionFactory: TubeSingleSessionFactory = _
+
+  def this(producerConfig: TubeMQProducerConf) = {
+    this(producerConfig.topic, producerConfig.masterHostAndPort, producerConfig.maxRetryTimes, producerConfig.exitOnException)
+  }
+
+  def start(): Unit = {
+    val clientConfig = new TubeClientConfig(masterHostAndPort)
+    sessionFactory = new TubeSingleSessionFactory(clientConfig)
+    producer = sessionFactory.createProducer()
+    val hashSet: HashSet[String] = new HashSet[String]
+    hashSet.add(topic)
+    producer.publish(hashSet)
+  }
+
+  def send(body: Array[Byte]): Unit = {
+    val message: Message = new Message(topic, body)
+    producer.sendMessage(message, new MessageSentCallback {
+      override def onMessageSent(sendResult: MessageSentResult): Unit = {
+        if (!sendResult.isSuccess) {
+          // rollback to sync
+          sendSync(message)
+        }
+      }
+
+      override def onException(throwable: Throwable): Unit = {
+        if (exitOnException) {
+          throw new SparkException("Sender message exception", throwable)
+        } else {
+          LOG.warn("Sender message exception", throwable)
+        }
+      }
+
+    })
+  }
+
+  private def sendSync(message: Message): Unit = {
+    var success = false
+    var retryTimes = 0
+    while (!success && retryTimes < maxRetryTimes) {
+      retryTimes += 1
+      val sendResult = producer.sendMessage(message)
+      if (sendResult.isSuccess) {
+        success = true
+      }
+      else {
+        retryTimes = maxRetryTimes
+      }
+    }
+
+    if (!success) {
+      val error = s"Sender message exception: exceed maxRetryTimes($maxRetryTimes)"
+      if (exitOnException) {
+        throw new SparkException(error)
+      } else {
+        LOG.warn(error)
+      }
+    }
+  }
+
+  def close(): Unit = {
+    try {
+      if (producer != null) {
+        producer.shutdown
+        producer = null
+      }
+      if (sessionFactory != null) {
+        sessionFactory.shutdown
+        sessionFactory = null
+      }
+    }
+    catch {
+      case e: Exception => {
+        LOG.error("Shutdown producer error", e)
+      }
+    }
+  }
+
+}
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQProvider.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQProvider.scala
new file mode 100644
index 0000000..c529958
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQProvider.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.DStream
+
+import TubeMQFunctions._
+
+class TubeMQProvider(val ssc: StreamingContext) {
+
+  /**
+   * Receive from tube
+   * @param config the configuration of receiver's consumer
+   * @param numReceiver the number of receivers
+   * @return DStream[ Array[Byte] ]
+   */
+  def bytesStream(
+      config: ConsumerConf,
+      numReceiver: Int): DStream[Array[Byte]] = {
+    require(numReceiver >= 1, s"the argument 'numReceiver' error: $numReceiver >= 1 ?")
+    val streams = config match {
+      case conf: TubeMQConsumerConf =>
+        (1 to numReceiver).map { _ =>
+          ssc.receiverStream(new TubeMQConsumer(conf))
+        }
+      case _ =>
+        throw new UnsupportedOperationException("Unknown receiver config.")
+    }
+    ssc.union(streams)
+  }
+
+  /**
+   * Receive from tube
+   * @param config the configuration of receiver's consumer
+   * @param numReceiver the number of receivers
+   * @return DStream[String]
+   */
+  def textStream(
+      config: ConsumerConf,
+      numReceiver: Int): DStream[String] = {
+    bytesStream(config, numReceiver).map(x => new String(x, "utf8"))
+  }
+
+  /**
+   * Send to tube
+   * @param dstream the data to be send
+   * @param config sender config
+   */
+  def saveBytesStreamToTDBank(dstream: DStream[Array[Byte]], config: ProducerConf): Unit = {
+    dstream.saveToTube(config)
+  }
+
+  /**
+   * Send to tube
+   * @param dstream the data to be send
+   * @param config sender config
+   */
+  def saveTextStreamToTDBank(dstream: DStream[String], config: ProducerConf): Unit = {
+    dstream.saveToTube(config)
+  }
+}
+