You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by GitBox <gi...@apache.org> on 2020/07/01 09:07:01 UTC

[GitHub] [incubator-tubemq] guangxuCheng commented on a change in pull request #176: Support TubeMQ connector for Apache Spark Streaming

guangxuCheng commented on a change in pull request #176:
URL: https://github.com/apache/incubator-tubemq/pull/176#discussion_r448219137



##########
File path: tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/JavaTubeProvider.scala
##########
@@ -0,0 +1,35 @@
+package org.apache.tubemq.connector.spark
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+
+class JavaTubeProvider private(tdbank: TubeProvider) {

Review comment:
       Rename to JavaTubeMQProvider

##########
File path: tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/ConsumerConf.scala
##########
@@ -0,0 +1,141 @@
+package org.apache.tubemq.connector.spark

Review comment:
       Need to add apache license at the head of the file

##########
File path: tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeConsumer.scala
##########
@@ -0,0 +1,94 @@
+package org.apache.tubemq.connector.spark

Review comment:
       Add apache license

##########
File path: tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeProvider.scala
##########
@@ -0,0 +1,61 @@
+package org.apache.tubemq.connector.spark
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.DStream
+
+import TubeFunctions._
+
+class TubeProvider(val ssc: StreamingContext) {

Review comment:
       Rename to TubeMQProvider

##########
File path: tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeProducer.scala
##########
@@ -0,0 +1,101 @@
+package org.apache.tubemq.connector.spark
+
+import java.util.HashSet
+
+import org.apache.spark.SparkException
+
+import org.apache.tubemq.client.config.TubeClientConfig
+import org.apache.tubemq.client.factory.TubeSingleSessionFactory
+import org.apache.tubemq.client.producer.{MessageProducer, MessageSentCallback, MessageSentResult}
+import org.apache.tubemq.corebase.Message
+import org.slf4j.{Logger, LoggerFactory}
+
+private[spark] class TubeProducer(

Review comment:
       Rename to TubeProducer

##########
File path: tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeFunctions.scala
##########
@@ -0,0 +1,97 @@
+package org.apache.tubemq.connector.spark
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkEnv
+import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.language.implicitConversions
+
+object TubeFunctions {
+

Review comment:
       Rename to TubeMQFunctions

##########
File path: tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeProducer.scala
##########
@@ -0,0 +1,101 @@
+package org.apache.tubemq.connector.spark

Review comment:
       Add apache license

##########
File path: tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeFunctions.scala
##########
@@ -0,0 +1,97 @@
+package org.apache.tubemq.connector.spark

Review comment:
       Add apache license

##########
File path: tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/TubeFunctions.scala
##########
@@ -0,0 +1,97 @@
+package org.apache.tubemq.connector.spark
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkEnv
+import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.language.implicitConversions
+
+object TubeFunctions {
+
+  class ArrayByteRDDFunctins(rdd: RDD[Array[Byte]]) extends Serializable {
+    def saveToTube(config: ProducerConf): Unit = {
+      SparkEnv.get.conf.set("spark.task.maxFailures", "1")

Review comment:
       Rename to saveToTubeMQ

##########
File path: tubemq-connectors/tubemq-connector-spark/src/main/scala/org/apache/tubemq/connector/spark/JavaTubeProvider.scala
##########
@@ -0,0 +1,35 @@
+package org.apache.tubemq.connector.spark
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+
+class JavaTubeProvider private(tdbank: TubeProvider) {

Review comment:
       Please don't use `tdbank` anywhere in all files




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org