You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/07/02 07:16:15 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-238] Support
TubeMQ connector for Apache Spark Streaming (#176)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 26262a5 [TUBEMQ-238] Support TubeMQ connector for Apache Spark Streaming (#176)
26262a5 is described below
commit 26262a5678d846d1f108cc90125525c84fe246bd
Author: imarch1 <im...@gmail.com>
AuthorDate: Thu Jul 2 15:16:06 2020 +0800
[TUBEMQ-238] Support TubeMQ connector for Apache Spark Streaming (#176)
Co-authored-by: shawnqzhang <sh...@tencent.com>
---
tubemq-connectors/pom.xml | 1 +
.../{ => tubemq-connector-spark}/pom.xml | 59 ++++----
.../tubemq/connector/spark/ConsumerConf.scala | 159 +++++++++++++++++++++
.../connector/spark/JavaTubeMQProvider.scala | 53 +++++++
.../tubemq/connector/spark/ProducerConf.scala | 76 ++++++++++
.../tubemq/connector/spark/TubeMQConsumer.scala | 112 +++++++++++++++
.../tubemq/connector/spark/TubeMQFunctions.scala | 115 +++++++++++++++
.../tubemq/connector/spark/TubeMQProducer.scala | 119 +++++++++++++++
.../tubemq/connector/spark/TubeMQProvider.scala | 79 ++++++++++
9 files changed, 747 insertions(+), 26 deletions(-)
diff --git a/tubemq-connectors/pom.xml b/tubemq-connectors/pom.xml
index d75d2be..dec8438 100644
--- a/tubemq-connectors/pom.xml
+++ b/tubemq-connectors/pom.xml
@@ -32,6 +32,7 @@
<modules>
<module>tubemq-connector-flink</module>
<module>tubemq-connector-flume</module>
+ <module>tubemq-connector-spark</module>
</modules>
<dependencies>
diff --git a/tubemq-connectors/pom.xml b/tubemq-connectors/tubemq-connector-spark/pom.xml
similarity index 57%
copy from tubemq-connectors/pom.xml
copy to tubemq-connectors/tubemq-connector-spark/pom.xml
index d75d2be..a91cdc3 100644
--- a/tubemq-connectors/pom.xml
+++ b/tubemq-connectors/tubemq-connector-spark/pom.xml
@@ -20,19 +20,18 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
+ <artifactId>tubemq-connectors</artifactId>
<groupId>org.apache.tubemq</groupId>
- <artifactId>tubemq</artifactId>
<version>0.5.0-incubating-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <name>Apache TubeMQ - Connectors</name>
- <artifactId>tubemq-connectors</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>tubemq-connector-flink</module>
- <module>tubemq-connector-flume</module>
- </modules>
+ <name>Apache TubeMQ - Connectors-spark</name>
+ <artifactId>tubemq-connector-spark</artifactId>
+
+ <properties>
+ <scala.binary.version>2.11</scala.binary.version>
+ <spark.version>2.4.4</spark.version>
+ </properties>
<dependencies>
<dependency>
@@ -44,22 +43,30 @@
<groupId>org.apache.tubemq</groupId>
<artifactId>tubemq-core</artifactId>
</dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- <encoding>UTF-8</encoding>
- <showDeprecation>true</showDeprecation>
- <showWarnings>true</showWarnings>
- </configuration>
- </plugin>
- </plugins>
- </build>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/ConsumerConf.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/ConsumerConf.scala
new file mode 100644
index 0000000..52cf9d3
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/ConsumerConf.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+import org.apache.spark.storage.StorageLevel
+
+import scala.collection.JavaConversions._
+
+abstract class ConsumerConf {
+ private var _group: String = _
+ def group: String = _group
+ def setGroup(value: String): this.type = {
+ _group = value
+ this
+ }
+
+ private var _topic: String = _
+ def topic: String = _topic
+ def setTopic(value: String): this.type = {
+ _topic = value
+ this
+ }
+
+ private var _consumeFromMaxOffset: Boolean = true
+ def consumeFromMaxOffset: Boolean = _consumeFromMaxOffset
+ def setConsumeFromMaxOffset(value: Boolean): this.type = {
+ _consumeFromMaxOffset = value
+ this
+ }
+
+ private var _storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK
+ def storageLevel: StorageLevel = _storageLevel
+ def setStorageLevel(value: StorageLevel): this.type = {
+ _storageLevel = value
+ this
+ }
+}
+
+class TubeMQConsumerConf extends ConsumerConf {
+ private var _master: String = _
+ def master: String = _master
+ def setMaster(value: String): this.type = {
+ _master = value
+ this
+ }
+
+ private var _filterAttrs:Array[String] = _
+ private var _filterAttrId:String = _
+ private[this] var _filterOnRemote: Boolean = false
+
+ def filterAttrId:String = _filterAttrId;
+ def filterAttrs:Array[String] = _filterAttrs;
+
+ def setFilterAttrs(attrId: String, value: Array[String]): this.type = {
+ require(filterAttrId == null,s"Has been specified $filterAttrId as message filter attribute id")
+ _filterAttrs = value
+ _filterAttrId = attrId
+ this
+ }
+
+ def setTids(value: Array[String]): this.type = {
+ setFilterAttrs("tid", value)
+ }
+
+ def setINames(value: Array[String]): this.type = {
+ setFilterAttrs("iname", value)
+ }
+
+ private var _includeAttrId:String = null;
+ def includeAttrId = _includeAttrId
+ def setIncludeAttrId(attrId:String): this.type ={
+ _includeAttrId = attrId
+ this
+ }
+
+ def setIncludeTid(value: Boolean): this.type = {
+ if(value){
+ setIncludeAttrId("tid")
+ }
+ this
+ }
+
+ def setIncludeIName(value: Boolean): this.type = {
+ if(value){
+ setIncludeAttrId("iname")
+ }
+ this
+ }
+
+ def filterOnRemote: Boolean = _filterOnRemote
+
+ def setFilterOnRemote(value: Boolean): this.type = {
+ _filterOnRemote = value
+ this
+ }
+
+ // for python api
+ def buildFrom(
+ master: String,
+ group: String,
+ topic: String,
+ tids: java.util.List[String],
+ consumeFromMaxOffset: Boolean,
+ includeTid: Boolean,
+ storageLevel: StorageLevel): this.type = {
+ buildFrom(master, group, topic, tids, "tid", if (includeTid) "tid" else null, consumeFromMaxOffset, storageLevel)
+ }
+
+ // for python api
+ def buildFrom(
+ master: String,
+ group: String,
+ topic: String,
+ filterAttrs: java.util.List[String],
+ filterAttrId: String,
+ includeAttrId: String,
+ consumeFromMaxOffset: Boolean,
+ storageLevel: StorageLevel): this.type = {
+ buildFrom(master, group, topic, filterAttrs, filterAttrId, false, includeAttrId, consumeFromMaxOffset, storageLevel)
+ }
+
+ def buildFrom(
+ master: String,
+ group: String,
+ topic: String,
+ filterAttrs: java.util.List[String],
+ filterAttrId: String,
+ filterOnRemote: Boolean,
+ includeAttrId: String,
+ consumeFromMaxOffset: Boolean,
+ storageLevel: StorageLevel): this.type = {
+ _master = master
+ setFilterAttrs(filterAttrId, if (filterAttrs != null) filterAttrs.toList.toArray else null)
+ setFilterOnRemote(filterOnRemote)
+ setIncludeAttrId(includeAttrId)
+ setGroup(group)
+ setTopic(topic)
+ setConsumeFromMaxOffset(consumeFromMaxOffset)
+ setStorageLevel(storageLevel)
+ this
+ }
+}
+
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/JavaTubeMQProvider.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/JavaTubeMQProvider.scala
new file mode 100644
index 0000000..8ebfb6a
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/JavaTubeMQProvider.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+
+class JavaTubeMQProvider private(tubeProvider: TubeMQProvider) {
+ def this(jssc: JavaStreamingContext) = {
+ this(new TubeMQProvider(jssc.ssc))
+ }
+
+ def bytesStream(
+ config: ConsumerConf,
+ numReceiver: Int): JavaDStream[Array[Byte]] = {
+ tubeProvider.bytesStream(config, numReceiver)
+ }
+
+ def textStream(
+ config: ConsumerConf,
+ numReceiver: Int,
+ storageLevel: StorageLevel): JavaDStream[String] = {
+ tubeProvider.textStream(config, numReceiver)
+ }
+
+ def saveBytesStreamToTDBank(
+ dstream: JavaDStream[Array[Byte]],
+ config: ProducerConf): Unit = {
+ tubeProvider.saveBytesStreamToTDBank(dstream.dstream, config)
+ }
+
+ def saveTextStreamToTDBank(
+ dstream: JavaDStream[String],
+ config: ProducerConf): Unit = {
+ tubeProvider.saveTextStreamToTDBank(dstream.dstream, config)
+ }
+}
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/ProducerConf.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/ProducerConf.scala
new file mode 100644
index 0000000..7394f40
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/ProducerConf.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+// Hold sender common parameters
+abstract class ProducerConf extends Serializable {
+}
+
+class TubeMQProducerConf extends ProducerConf {
+ private var _topic: String = _
+ def topic: String = _topic
+ def setTopic(value: String): this.type = {
+ _topic = value
+ this
+ }
+
+ private var _masterHostAndPort: String = _
+ def masterHostAndPort: String = _masterHostAndPort
+ def setMasterHostAndPort(value: String): this.type = {
+ _masterHostAndPort = value
+ this
+ }
+
+ private var _timeout: Long = 20000 // 20s
+ def timeout: Long = _timeout
+ def setTimeout(value: Long): this.type = {
+ _timeout = value
+ this
+ }
+
+ private var _maxRetryTimes: Int = 3
+ def maxRetryTimes: Int = _maxRetryTimes
+ def setMaxRetryTimes(value: Int): this.type = {
+ _maxRetryTimes = value
+ this
+ }
+
+
+ private var _exitOnException: Boolean = true
+ def exitOnException: Boolean = _exitOnException
+ def setExitOnException(value: Boolean): this.type = {
+ _exitOnException = value
+ this
+ }
+
+ // for python api
+ def buildFrom(
+ topic: String,
+ masterHostAndPort: String,
+ timeout: Int,
+ maxRetryTimes: Int,
+ exitOnException: Boolean): this.type = {
+ _topic = topic
+ _masterHostAndPort = masterHostAndPort
+ _timeout = timeout
+ _maxRetryTimes = maxRetryTimes
+ _exitOnException = exitOnException
+ this
+ }
+}
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQConsumer.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQConsumer.scala
new file mode 100644
index 0000000..88881ce
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQConsumer.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+import java.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+
+import org.apache.tubemq.client.config.{ConsumerConfig, TubeClientConfig}
+import org.apache.tubemq.client.consumer.MessageListener
+import org.apache.tubemq.client.factory.{MessageSessionFactory, TubeSingleSessionFactory}
+import org.apache.tubemq.corebase.Message
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class TubeMQConsumer(
+ master: String,
+ group: String,
+ topic: String,
+ filterAttrs: Seq[String],
+ filterAttrId:String,
+ filterOnRemote:Boolean,
+ includeAttrId:String,
+ consumeFromMaxOffset: Boolean,
+ storageLevel: StorageLevel)
+ extends Receiver[Array[Byte]](storageLevel) {
+ self =>
+
+ def this(receiverConfig: TubeMQConsumerConf) = {
+ this(receiverConfig.master,
+ receiverConfig.group,
+ receiverConfig.topic,
+ receiverConfig.filterAttrs,
+ receiverConfig.filterAttrId,
+ receiverConfig.filterOnRemote,
+ receiverConfig.includeAttrId,
+ receiverConfig.consumeFromMaxOffset,
+ receiverConfig.storageLevel)
+ }
+
+ require(master != null, "'master' must be set.")
+ require(group != null, "'group' must be set.")
+ require(topic != null, "'topic' must be set.")
+
+
+
+ @transient
+ var messageSessionFactory: MessageSessionFactory = null
+
+ override def onStart(): Unit = {
+ val config = new TubeClientConfig(master)
+ messageSessionFactory = new TubeSingleSessionFactory(config)
+ val consumerConfig = new ConsumerConfig(master, group)
+ val consumeModel = if (consumeFromMaxOffset) 1 else 0
+ consumerConfig.setConsumeModel(consumeModel)
+ val consumer = messageSessionFactory.createPushConsumer(consumerConfig)
+ if (filterOnRemote && filterAttrs != null) {
+ consumer.subscribe(topic, new util.TreeSet[String](filterAttrs), new SimpleMessageListener(topic, filterAttrs,
+ filterAttrId, includeAttrId))
+ } else {
+ consumer.subscribe(topic, null, new SimpleMessageListener(topic, filterAttrs, filterAttrId, includeAttrId))
+ }
+ consumer.completeSubscribe()
+ }
+
+ override def onStop(): Unit = {
+ if (messageSessionFactory != null) {
+ messageSessionFactory.shutdown()
+ messageSessionFactory = null
+ }
+ }
+
+ class SimpleMessageListener(topic: String, filterAttrs: Seq[String], filterAttrId: String, includeAttrId: String)
+ extends MessageListener {
+
+ override def receiveMessages(messages: util.List[Message]): Unit = {
+ messages.asScala.foreach(msg => processMessage(msg))
+ }
+
+ def processMessage(message: Message) = {
+ if (!topic.equals(message.getTopic)) {
+ throw new SparkException(
+ s"topic error, the message topic is ${message.getTopic}")
+ }
+
+ // Deal with message.getData
+ }
+
+ override def getExecutor = null
+
+ override def stop() = {}
+ }
+}
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQFunctions.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQFunctions.scala
new file mode 100644
index 0000000..1b10706
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQFunctions.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkEnv
+import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.language.implicitConversions
+
+object TubeMQFunctions {
+
+ class ArrayByteRDDFunctins(rdd: RDD[Array[Byte]]) extends Serializable {
+ def saveToTube(config: ProducerConf): Unit = {
+ SparkEnv.get.conf.set("spark.task.maxFailures", "1")
+ SparkEnv.get.conf.set("spark.speculation", "false")
+ config match {
+ case conf: TubeMQProducerConf =>
+ val sender = new TubeMQProducer(conf)
+ val writer = (iter: Iterator[Array[Byte]]) => {
+ sender.start()
+ try {
+ while (iter.hasNext) {
+ val record = iter.next()
+ sender.send(record)
+ }
+ } finally {
+ sender.close()
+ }
+ }
+ rdd.context.runJob(rdd, writer)
+
+ case _ => throw new UnsupportedOperationException("Unknown SenderConfig")
+ }
+ }
+ }
+
+ class StringRDDFunctins(rdd: RDD[String]) extends Serializable {
+ def saveToTube(config: ProducerConf): Unit = {
+ rdd.map(_.getBytes("UTF-8")).saveToTube(config)
+ }
+ }
+
+ class ArrayByteDStreamFunctins(dstream: DStream[Array[Byte]]) extends Serializable {
+ def saveRDD(rdd: RDD[Array[Byte]], sender: TubeMQProducer): Unit = {
+ val writer = (iter: Iterator[Array[Byte]]) => {
+ try {
+ sender.start()
+ while (iter.hasNext) {
+ val record = iter.next()
+ sender.send(record)
+ }
+ }
+ finally {
+ sender.close()
+ }
+ }
+ rdd.context.runJob(rdd, writer)
+ }
+
+ def saveToTube(config: ProducerConf): Unit = {
+ SparkEnv.get.conf.set("spark.task.maxFailures", "1")
+ SparkEnv.get.conf.set("spark.speculation", "false")
+ val sender: TubeMQProducer = config match {
+ case conf: TubeMQProducerConf => new TubeMQProducer(conf)
+ case _ => throw new UnsupportedOperationException("Unknown SenderConfig")
+ }
+ val saveFunc = (rdd: RDD[Array[Byte]], time: Time) => {
+ saveRDD(rdd, sender)
+ }
+ dstream.foreachRDD(saveFunc)
+ }
+ }
+
+ class StringDStreamFunctins(dstream: DStream[String]) extends Serializable {
+ def saveToTube(config: ProducerConf): Unit = {
+ dstream.map(_.getBytes("UTF-8")).saveToTube(config)
+ }
+ }
+
+ implicit def rddToArrayByteRDDFunctions(rdd: RDD[Array[Byte]]): ArrayByteRDDFunctins = {
+ new ArrayByteRDDFunctins(rdd)
+ }
+
+ implicit def rddToStringRDDFunctions(rdd: RDD[String]): StringRDDFunctins = {
+ new StringRDDFunctins(rdd)
+ }
+
+ implicit def dStreamToArrayByteDStreamFunctions(
+ dstream: DStream[Array[Byte]]): ArrayByteDStreamFunctins = {
+ new ArrayByteDStreamFunctins(dstream)
+ }
+
+ implicit def dStreamToStringDStreamFunctions(
+ dstream: DStream[String]): StringDStreamFunctins = {
+ new StringDStreamFunctins(dstream)
+ }
+}
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQProducer.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQProducer.scala
new file mode 100644
index 0000000..6ad521f
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQProducer.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+import java.util.HashSet
+
+import org.apache.spark.SparkException
+
+import org.apache.tubemq.client.config.TubeClientConfig
+import org.apache.tubemq.client.factory.TubeSingleSessionFactory
+import org.apache.tubemq.client.producer.{MessageProducer, MessageSentCallback, MessageSentResult}
+import org.apache.tubemq.corebase.Message
+import org.slf4j.{Logger, LoggerFactory}
+
+private[spark] class TubeMQProducer(
+ topic: String,
+ masterHostAndPort: String,
+ maxRetryTimes: Int,
+ exitOnException: Boolean)
+ extends Serializable {
+
+ private val LOG: Logger = LoggerFactory.getLogger(classOf[TubeMQProducer])
+
+ private var producer: MessageProducer = _
+ private var sessionFactory: TubeSingleSessionFactory = _
+
+ def this(producerConfig: TubeMQProducerConf) = {
+ this(producerConfig.topic, producerConfig.masterHostAndPort, producerConfig.maxRetryTimes, producerConfig.exitOnException)
+ }
+
+ def start(): Unit = {
+ val clientConfig = new TubeClientConfig(masterHostAndPort)
+ sessionFactory = new TubeSingleSessionFactory(clientConfig)
+ producer = sessionFactory.createProducer()
+ val hashSet: HashSet[String] = new HashSet[String]
+ hashSet.add(topic)
+ producer.publish(hashSet)
+ }
+
+ def send(body: Array[Byte]): Unit = {
+ val message: Message = new Message(topic, body)
+ producer.sendMessage(message, new MessageSentCallback {
+ override def onMessageSent(sendResult: MessageSentResult): Unit = {
+ if (!sendResult.isSuccess) {
+ // rollback to sync
+ sendSync(message)
+ }
+ }
+
+ override def onException(throwable: Throwable): Unit = {
+ if (exitOnException) {
+ throw new SparkException("Sender message exception", throwable)
+ } else {
+ LOG.warn("Sender message exception", throwable)
+ }
+ }
+
+ })
+ }
+
+ private def sendSync(message: Message): Unit = {
+ var success = false
+ var retryTimes = 0
+ while (!success && retryTimes < maxRetryTimes) {
+ retryTimes += 1
+ val sendResult = producer.sendMessage(message)
+ if (sendResult.isSuccess) {
+ success = true
+ }
+ else {
+ retryTimes = maxRetryTimes
+ }
+ }
+
+ if (!success) {
+ val error = s"Sender message exception: exceed maxRetryTimes($maxRetryTimes)"
+ if (exitOnException) {
+ throw new SparkException(error)
+ } else {
+ LOG.warn(error)
+ }
+ }
+ }
+
+ def close(): Unit = {
+ try {
+ if (producer != null) {
+ producer.shutdown
+ producer = null
+ }
+ if (sessionFactory != null) {
+ sessionFactory.shutdown
+ sessionFactory = null
+ }
+ }
+ catch {
+ case e: Exception => {
+ LOG.error("Shutdown producer error", e)
+ }
+ }
+ }
+
+}
diff --git a/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQProvider.scala b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQProvider.scala
new file mode 100644
index 0000000..c529958
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeMQProvider.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.connector.spark
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.DStream
+
+import TubeMQFunctions._
+
+class TubeMQProvider(val ssc: StreamingContext) {
+
+ /**
+ * Receive from tube
+ * @param config the configuration of receiver's consumer
+ * @param numReceiver the number of receivers
+ * @return DStream[ Array[Byte] ]
+ */
+ def bytesStream(
+ config: ConsumerConf,
+ numReceiver: Int): DStream[Array[Byte]] = {
+ require(numReceiver >= 1, s"the argument 'numReceiver' error: $numReceiver >= 1 ?")
+ val streams = config match {
+ case conf: TubeMQConsumerConf =>
+ (1 to numReceiver).map { _ =>
+ ssc.receiverStream(new TubeMQConsumer(conf))
+ }
+ case _ =>
+ throw new UnsupportedOperationException("Unknown receiver config.")
+ }
+ ssc.union(streams)
+ }
+
+ /**
+ * Receive from tube
+ * @param config the configuration of receiver's consumer
+ * @param numReceiver the number of receivers
+ * @return DStream[String]
+ */
+ def textStream(
+ config: ConsumerConf,
+ numReceiver: Int): DStream[String] = {
+ bytesStream(config, numReceiver).map(x => new String(x, "utf8"))
+ }
+
+ /**
+ * Send to tube
+ * @param dstream the data to be send
+ * @param config sender config
+ */
+ def saveBytesStreamToTDBank(dstream: DStream[Array[Byte]], config: ProducerConf): Unit = {
+ dstream.saveToTube(config)
+ }
+
+ /**
+ * Send to tube
+ * @param dstream the data to be send
+ * @param config sender config
+ */
+ def saveTextStreamToTDBank(dstream: DStream[String], config: ProducerConf): Unit = {
+ dstream.saveToTube(config)
+ }
+}
+