You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Chan Chor Pang <ch...@indetail.co.jp> on 2016/12/02 02:19:02 UTC

Re: REST api for monitoring Spark Streaming

hi everyone

I have done the coding and create the PR
the implementation is straightforward and similar to the api in spark-core
but we still need someone with streaming background to verify the patch
just to make sure everything is OK

so, please anyone can help?
https://github.com/apache/spark/pull/16000


On 11/8/16 1:46 PM, Chan Chor Pang wrote:
>
> Thank you
>
> this should take me at least a few days, and will let you know as soon 
> as the PR ready.
>
>
> On 11/8/16 11:44 AM, Tathagata Das wrote:
>> This may be a good addition. I suggest you read our guidelines on 
>> contributing code to Spark.
>>
>> https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-PreparingtoContributeCodeChanges
>>
>> Its long document but it should have everything for you to figure out 
>> how to contribute your changes. I hope to see your changes in a 
>> Github PR soon!
>>
>> TD
>>
>> On Mon, Nov 7, 2016 at 5:30 PM, Chan Chor Pang 
>> <chin-sh@indetail.co.jp <ma...@indetail.co.jp>> wrote:
>>
>>     hi everyone
>>
>>     it seems that there is not much who interested in creating a api
>>     for Streaming.
>>     never the less I still really want the api for monitoring.
>>     so i tried to see if i can implement by my own.
>>
>>     after some test,
>>     i believe i can achieve the goal by
>>     1. implement a package(org.apache.spark.streaming.status.api.v1)
>>     that serve the same purpose as org.apache.spark.status.api.v1
>>     2. register the api path through StreamingTab
>>     and 3. retrive the streaming informateion through
>>     StreamingJobProgressListener
>>
>>     what my most concern now is will my implementation be able to
>>     merge to the main stream.
>>
>>     im new to open source project, so anyone could please show me
>>     some light?
>>     how should/could i proceed to make my implementation to be able
>>     to merge to the main stream.
>>
>>
>>     here is my test code base on v1.6.0
>>     ###################################
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>>     new file mode 100644
>>     index 0000000..690e2d8
>>     --- /dev/null
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>>     @@ -0,0 +1,68 @@
>>     +package org.apache.spark.streaming.status.api.v1
>>     +
>>     +import java.io.OutputStream
>>     +import java.lang.annotation.Annotation
>>     +import java.lang.reflect.Type
>>     +import java.text.SimpleDateFormat
>>     +import java.util.{Calendar, SimpleTimeZone}
>>     +import javax.ws.rs.Produces
>>     +import javax.ws.rs.core.{MediaType, MultivaluedMap}
>>     +import javax.ws.rs.ext.{MessageBodyWriter, Provider}
>>     +
>>     +import com.fasterxml.jackson.annotation.JsonInclude
>>     +import com.fasterxml.jackson.databind.{ObjectMapper,
>>     SerializationFeature}
>>     +
>>     +@Provider
>>     +@Produces(Array(MediaType.APPLICATION_JSON))
>>     +private[v1] class JacksonMessageWriter extends
>>     MessageBodyWriter[Object]{
>>     +
>>     +  val mapper = new ObjectMapper() {
>>     +    override def writeValueAsString(t: Any): String = {
>>     +      super.writeValueAsString(t)
>>     +    }
>>     +  }
>>     +
>>     mapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
>>     +  mapper.enable(SerializationFeature.INDENT_OUTPUT)
>>     +  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
>>     +  mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat)
>>     +
>>     +  override def isWriteable(
>>     +      aClass: Class[_],
>>     +      `type`: Type,
>>     +      annotations: Array[Annotation],
>>     +      mediaType: MediaType): Boolean = {
>>     +      true
>>     +  }
>>     +
>>     +  override def writeTo(
>>     +      t: Object,
>>     +      aClass: Class[_],
>>     +      `type`: Type,
>>     +      annotations: Array[Annotation],
>>     +      mediaType: MediaType,
>>     +      multivaluedMap: MultivaluedMap[String, AnyRef],
>>     +      outputStream: OutputStream): Unit = {
>>     +    t match {
>>     +      //case ErrorWrapper(err) =>
>>     outputStream.write(err.getBytes("utf-8"))
>>     +      case _ => mapper.writeValue(outputStream, t)
>>     +    }
>>     +  }
>>     +
>>     +  override def getSize(
>>     +      t: Object,
>>     +      aClass: Class[_],
>>     +      `type`: Type,
>>     +      annotations: Array[Annotation],
>>     +      mediaType: MediaType): Long = {
>>     +    -1L
>>     +  }
>>     +}
>>     +
>>     +private[spark] object JacksonMessageWriter {
>>     +  def makeISODateFormat: SimpleDateFormat = {
>>     +    val iso8601 = new
>>     SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'")
>>     +    val cal = Calendar.getInstance(new SimpleTimeZone(0, "GMT"))
>>     +    iso8601.setCalendar(cal)
>>     +    iso8601
>>     +  }
>>     +}
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>>     new file mode 100644
>>     index 0000000..f4e43dd
>>     --- /dev/null
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>>     @@ -0,0 +1,74 @@
>>     +package org.apache.spark.streaming.status.api.v1
>>     +
>>     +import org.apache.spark.status.api.v1.UIRoot
>>     +import org.eclipse.jetty.server.handler.ContextHandler
>>     +import org.eclipse.jetty.servlet.ServletContextHandler
>>     +import org.eclipse.jetty.servlet.ServletHolder
>>     +
>>     +import com.sun.jersey.spi.container.servlet.ServletContainer
>>     +
>>     +import javax.servlet.ServletContext
>>     +import javax.ws.rs.Path
>>     +import javax.ws.rs.Produces
>>     +import javax.ws.rs.core.Context
>>     +import org.apache.spark.streaming.ui.StreamingJobProgressListener
>>     +
>>     +
>>     +@Path("/v1")
>>     +private[v1] class StreamingApiRootResource extends
>>     UIRootFromServletContext{
>>     +
>>     +  @Path("streaminginfo")
>>     +  def getStreamingInfo(): StreamingInfoResource = {
>>     +    new StreamingInfoResource(uiRoot,listener)
>>     +  }
>>     +
>>     +}
>>     +
>>     +private[spark] object StreamingApiRootResource {
>>     +
>>     +  def getServletHandler(uiRoot: UIRoot,
>>     listener:StreamingJobProgressListener): ServletContextHandler = {
>>     +
>>     +    val jerseyContext = new
>>     ServletContextHandler(ServletContextHandler.NO_SESSIONS)
>>     +    jerseyContext.setContextPath("/streamingapi")
>>     +    val holder: ServletHolder = new
>>     ServletHolder(classOf[ServletContainer])
>>     +
>>     holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
>>     +      "com.sun.jersey.api.core.PackagesResourceConfig")
>>     + holder.setInitParameter("com.sun.jersey.config.property.packages",
>>     +      "org.apache.spark.streaming.st
>>     <http://org.apache.spark.streaming.st>atus.api.v1")
>>     +
>>     //holder.setInitParameter(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS,
>>     +    //  classOf[SecurityFilter].getCanonicalName)
>>     +    UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot)
>>     +    UIRootFromServletContext.setListener(jerseyContext, listener)
>>     +    jerseyContext.addServlet(holder, "/*")
>>     +    jerseyContext
>>     +  }
>>     +}
>>     +
>>     +private[v1] object UIRootFromServletContext {
>>     +
>>     +  private val attribute = getClass.getCanonicalName
>>     +
>>     +  def setListener(contextHandler:ContextHandler, listener:
>>     StreamingJobProgressListener):Unit={
>>     +   contextHandler.setAttribute(attribute+"_listener", listener)
>>     +  }
>>     +
>>     +  def
>>     getListener(context:ServletContext):StreamingJobProgressListener={
>>     +
>>     context.getAttribute(attribute+"_listener").asInstanceOf[StreamingJobProgressListener]
>>     +  }
>>     +
>>     +  def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot):
>>     Unit = {
>>     +    contextHandler.setAttribute(attribute, uiRoot)
>>     +  }
>>     +
>>     +  def getUiRoot(context: ServletContext): UIRoot = {
>>     +    context.getAttribute(attribute).asInstanceOf[UIRoot]
>>     +  }
>>     +}
>>     +
>>     +private[v1] trait UIRootFromServletContext {
>>     +  @Context
>>     +  var servletContext: ServletContext = _
>>     +
>>     +  def uiRoot: UIRoot =
>>     UIRootFromServletContext.getUiRoot(servletContext)
>>     +  def listener: StreamingJobProgressListener =
>>     UIRootFromServletContext.getListener(servletContext)
>>     +}
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>>     new file mode 100644
>>     index 0000000..d5fc11b
>>     --- /dev/null
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>>     @@ -0,0 +1,22 @@
>>     +package org.apache.spark.streaming.status.api.v1
>>     +
>>     +import org.apache.spark.status.api.v1.SimpleDateParam
>>     +import org.apache.spark.status.api.v1.UIRoot
>>     +
>>     +import javax.ws.rs.GET
>>     +import javax.ws.rs.Produces
>>     +import javax.ws.rs.core.MediaType
>>     +import org.apache.spark.streaming.StreamingContext
>>     +import org.apache.spark.streaming.ui.StreamingJobProgressListener
>>     +
>>     +@Produces(Array(MediaType.APPLICATION_JSON))
>>     +private[v1] class StreamingInfoResource(uiRoot: UIRoot,
>>     listener: StreamingJobProgressListener){
>>     +
>>     +  @GET
>>     +  def streamingInfo()
>>     +  :Iterator[StreamingInfo]={
>>     +    var v = listener.numTotalCompletedBatches
>>     +    Iterator(new StreamingInfo("testname",v))
>>     +
>>     +  }
>>     +}
>>     \ No newline at end of file
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>>     new file mode 100644
>>     index 0000000..958dd41
>>     --- /dev/null
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>>     @@ -0,0 +1,6 @@
>>     +package org.apache.spark.streaming.status.api.v1
>>     +
>>     +class StreamingInfo private[streaming](
>>     +    val name:String,
>>     +    val completedBatchCount:Long)
>>     +
>>     \ No newline at end of file
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>>     index bc53f2a..877abf4 100644
>>     ---
>>     a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>>     @@ -22,6 +22,7 @@ import org.apache.spark.streaming.StreamingContext
>>      import org.apache.spark.ui.{SparkUI, SparkUITab}
>>
>>      import StreamingTab._
>>     +import
>>     org.apache.spark.streaming.status.api.v1.StreamingApiRootResource
>>
>>      /**
>>       * Spark Web UI tab that shows statistics of a streaming job.
>>     @@ -39,6 +40,9 @@ private[spark] class StreamingTab(val ssc:
>>     StreamingContext)
>>        ssc.sc.addSparkListener(listener)
>>        attachPage(new StreamingPage(this))
>>        attachPage(new BatchPage(this))
>>     +
>>     +  //register streaming api
>>     +
>>     parent.attachHandler(StreamingApiRootResource.getServletHandler(parent,listener));
>>
>>        def attach() {
>>          getSparkUI(ssc).attachTab(this)
>>
>>
>>     On 9/14/16 10:13 AM, Chan Chor Pang wrote:
>>
>>         Hi everyone,
>>
>>         Trying to monitoring our streaming application using Spark
>>         REST interface
>>         only to found that there is no such thing for Streaming.
>>
>>         I wonder if anyone already working on this or I should just
>>         start implementing my own one?
>>
>>
>>
>>     ---------------------------------------------------------------------
>>     To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>     <ma...@spark.apache.org>
>>
>>
>

-- 
---*------------------------------------------------*---*---*---*---
\u682a\u5f0f\u4f1a\u793eINDETAIL
\u30cb\u30a2\u30b7\u30e7\u30a2\u7dcf\u5408\u30b5\u30fc\u30d3\u30b9\u4e8b\u696d\u672c\u90e8
\u30b2\u30fc\u30e0\u30b5\u30fc\u30d3\u30b9\u4e8b\u696d\u90e8
\u9673\u3000\u695a\u9d6c
E-mail :chin-sh@indetail.co.jp
URL : http://www.indetail.co.jp

\u3010\u672d\u5e4c\u672c\u793e\uff0fLABO\uff0fLABO2\u3011
\u3012060-0042
\u672d\u5e4c\u5e02\u4e2d\u592e\u533a\u5927\u901a\u897f9\u4e01\u76ee3\u756a\u573033
\u30ad\u30bf\u30b3\u30fc\u30bb\u30f3\u30bf\u30fc\u30d3\u30eb\u30c7\u30a3\u30f3\u30b0
\uff08\u672d\u5e4c\u672c\u793e\uff0fLABO2\uff1a2\u968e\u3001LABO\uff1a9\u968e\uff09
TEL\uff1a011-206-9235 FAX\uff1a011-206-9236

\u3010\u6771\u4eac\u652f\u5e97\u3011
\u3012108-0014
\u6771\u4eac\u90fd\u6e2f\u533a\u829d5\u4e01\u76ee29\u756a20\u53f7 \u30af\u30ed\u30b9\u30aa\u30d5\u30a3\u30b9\u4e09\u7530
TEL\uff1a03-6809-6502 FAX\uff1a03-6809-6504

\u3010\u540d\u53e4\u5c4b\u30b5\u30c6\u30e9\u30a4\u30c8\u3011
\u3012460-0002
\u611b\u77e5\u770c\u540d\u53e4\u5c4b\u5e02\u4e2d\u533a\u4e38\u306e\u51853\u4e01\u76ee17\u756a24\u53f7 NAYUTA BLD
TEL\uff1a052-971-0086