You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Chan Chor Pang <ch...@indetail.co.jp> on 2016/09/14 01:13:57 UTC

REST api for monitoring Spark Streaming

Hi everyone,

Trying to monitoring our streaming application using Spark REST interface
only to found that there is no such thing for Streaming.

I wonder if anyone already working on this or I should just start 
implementing my own one?

-- 
BR
Peter Chan


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: REST api for monitoring Spark Streaming

Posted by Chan Chor Pang <ch...@indetail.co.jp>.
hi everyone

I have done the coding and create the PR
the implementation is straightforward and similar to the api in spark-core
but we still need someone with streaming background to verify the patch
just to make sure everything is OK

so, please anyone can help?
https://github.com/apache/spark/pull/16000


On 11/8/16 1:46 PM, Chan Chor Pang wrote:
>
> Thank you
>
> this should take me at least a few days, and will let you know as soon 
> as the PR ready.
>
>
> On 11/8/16 11:44 AM, Tathagata Das wrote:
>> This may be a good addition. I suggest you read our guidelines on 
>> contributing code to Spark.
>>
>> https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-PreparingtoContributeCodeChanges
>>
>> Its long document but it should have everything for you to figure out 
>> how to contribute your changes. I hope to see your changes in a 
>> Github PR soon!
>>
>> TD
>>
>> On Mon, Nov 7, 2016 at 5:30 PM, Chan Chor Pang 
>> <chin-sh@indetail.co.jp <ma...@indetail.co.jp>> wrote:
>>
>>     hi everyone
>>
>>     it seems that there is not much who interested in creating a api
>>     for Streaming.
>>     never the less I still really want the api for monitoring.
>>     so i tried to see if i can implement by my own.
>>
>>     after some test,
>>     i believe i can achieve the goal by
>>     1. implement a package(org.apache.spark.streaming.status.api.v1)
>>     that serve the same purpose as org.apache.spark.status.api.v1
>>     2. register the api path through StreamingTab
>>     and 3. retrive the streaming informateion through
>>     StreamingJobProgressListener
>>
>>     what my most concern now is will my implementation be able to
>>     merge to the main stream.
>>
>>     im new to open source project, so anyone could please show me
>>     some light?
>>     how should/could i proceed to make my implementation to be able
>>     to merge to the main stream.
>>
>>
>>     here is my test code base on v1.6.0
>>     ###################################
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>>     new file mode 100644
>>     index 0000000..690e2d8
>>     --- /dev/null
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>>     @@ -0,0 +1,68 @@
>>     +package org.apache.spark.streaming.status.api.v1
>>     +
>>     +import java.io.OutputStream
>>     +import java.lang.annotation.Annotation
>>     +import java.lang.reflect.Type
>>     +import java.text.SimpleDateFormat
>>     +import java.util.{Calendar, SimpleTimeZone}
>>     +import javax.ws.rs.Produces
>>     +import javax.ws.rs.core.{MediaType, MultivaluedMap}
>>     +import javax.ws.rs.ext.{MessageBodyWriter, Provider}
>>     +
>>     +import com.fasterxml.jackson.annotation.JsonInclude
>>     +import com.fasterxml.jackson.databind.{ObjectMapper,
>>     SerializationFeature}
>>     +
>>     +@Provider
>>     +@Produces(Array(MediaType.APPLICATION_JSON))
>>     +private[v1] class JacksonMessageWriter extends
>>     MessageBodyWriter[Object]{
>>     +
>>     +  val mapper = new ObjectMapper() {
>>     +    override def writeValueAsString(t: Any): String = {
>>     +      super.writeValueAsString(t)
>>     +    }
>>     +  }
>>     +
>>     mapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
>>     +  mapper.enable(SerializationFeature.INDENT_OUTPUT)
>>     +  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
>>     +  mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat)
>>     +
>>     +  override def isWriteable(
>>     +      aClass: Class[_],
>>     +      `type`: Type,
>>     +      annotations: Array[Annotation],
>>     +      mediaType: MediaType): Boolean = {
>>     +      true
>>     +  }
>>     +
>>     +  override def writeTo(
>>     +      t: Object,
>>     +      aClass: Class[_],
>>     +      `type`: Type,
>>     +      annotations: Array[Annotation],
>>     +      mediaType: MediaType,
>>     +      multivaluedMap: MultivaluedMap[String, AnyRef],
>>     +      outputStream: OutputStream): Unit = {
>>     +    t match {
>>     +      //case ErrorWrapper(err) =>
>>     outputStream.write(err.getBytes("utf-8"))
>>     +      case _ => mapper.writeValue(outputStream, t)
>>     +    }
>>     +  }
>>     +
>>     +  override def getSize(
>>     +      t: Object,
>>     +      aClass: Class[_],
>>     +      `type`: Type,
>>     +      annotations: Array[Annotation],
>>     +      mediaType: MediaType): Long = {
>>     +    -1L
>>     +  }
>>     +}
>>     +
>>     +private[spark] object JacksonMessageWriter {
>>     +  def makeISODateFormat: SimpleDateFormat = {
>>     +    val iso8601 = new
>>     SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'")
>>     +    val cal = Calendar.getInstance(new SimpleTimeZone(0, "GMT"))
>>     +    iso8601.setCalendar(cal)
>>     +    iso8601
>>     +  }
>>     +}
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>>     new file mode 100644
>>     index 0000000..f4e43dd
>>     --- /dev/null
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>>     @@ -0,0 +1,74 @@
>>     +package org.apache.spark.streaming.status.api.v1
>>     +
>>     +import org.apache.spark.status.api.v1.UIRoot
>>     +import org.eclipse.jetty.server.handler.ContextHandler
>>     +import org.eclipse.jetty.servlet.ServletContextHandler
>>     +import org.eclipse.jetty.servlet.ServletHolder
>>     +
>>     +import com.sun.jersey.spi.container.servlet.ServletContainer
>>     +
>>     +import javax.servlet.ServletContext
>>     +import javax.ws.rs.Path
>>     +import javax.ws.rs.Produces
>>     +import javax.ws.rs.core.Context
>>     +import org.apache.spark.streaming.ui.StreamingJobProgressListener
>>     +
>>     +
>>     +@Path("/v1")
>>     +private[v1] class StreamingApiRootResource extends
>>     UIRootFromServletContext{
>>     +
>>     +  @Path("streaminginfo")
>>     +  def getStreamingInfo(): StreamingInfoResource = {
>>     +    new StreamingInfoResource(uiRoot,listener)
>>     +  }
>>     +
>>     +}
>>     +
>>     +private[spark] object StreamingApiRootResource {
>>     +
>>     +  def getServletHandler(uiRoot: UIRoot,
>>     listener:StreamingJobProgressListener): ServletContextHandler = {
>>     +
>>     +    val jerseyContext = new
>>     ServletContextHandler(ServletContextHandler.NO_SESSIONS)
>>     +    jerseyContext.setContextPath("/streamingapi")
>>     +    val holder: ServletHolder = new
>>     ServletHolder(classOf[ServletContainer])
>>     +
>>     holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
>>     +      "com.sun.jersey.api.core.PackagesResourceConfig")
>>     + holder.setInitParameter("com.sun.jersey.config.property.packages",
>>     +      "org.apache.spark.streaming.st
>>     <http://org.apache.spark.streaming.st>atus.api.v1")
>>     +
>>     //holder.setInitParameter(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS,
>>     +    //  classOf[SecurityFilter].getCanonicalName)
>>     +    UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot)
>>     +    UIRootFromServletContext.setListener(jerseyContext, listener)
>>     +    jerseyContext.addServlet(holder, "/*")
>>     +    jerseyContext
>>     +  }
>>     +}
>>     +
>>     +private[v1] object UIRootFromServletContext {
>>     +
>>     +  private val attribute = getClass.getCanonicalName
>>     +
>>     +  def setListener(contextHandler:ContextHandler, listener:
>>     StreamingJobProgressListener):Unit={
>>     +   contextHandler.setAttribute(attribute+"_listener", listener)
>>     +  }
>>     +
>>     +  def
>>     getListener(context:ServletContext):StreamingJobProgressListener={
>>     +
>>     context.getAttribute(attribute+"_listener").asInstanceOf[StreamingJobProgressListener]
>>     +  }
>>     +
>>     +  def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot):
>>     Unit = {
>>     +    contextHandler.setAttribute(attribute, uiRoot)
>>     +  }
>>     +
>>     +  def getUiRoot(context: ServletContext): UIRoot = {
>>     +    context.getAttribute(attribute).asInstanceOf[UIRoot]
>>     +  }
>>     +}
>>     +
>>     +private[v1] trait UIRootFromServletContext {
>>     +  @Context
>>     +  var servletContext: ServletContext = _
>>     +
>>     +  def uiRoot: UIRoot =
>>     UIRootFromServletContext.getUiRoot(servletContext)
>>     +  def listener: StreamingJobProgressListener =
>>     UIRootFromServletContext.getListener(servletContext)
>>     +}
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>>     new file mode 100644
>>     index 0000000..d5fc11b
>>     --- /dev/null
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>>     @@ -0,0 +1,22 @@
>>     +package org.apache.spark.streaming.status.api.v1
>>     +
>>     +import org.apache.spark.status.api.v1.SimpleDateParam
>>     +import org.apache.spark.status.api.v1.UIRoot
>>     +
>>     +import javax.ws.rs.GET
>>     +import javax.ws.rs.Produces
>>     +import javax.ws.rs.core.MediaType
>>     +import org.apache.spark.streaming.StreamingContext
>>     +import org.apache.spark.streaming.ui.StreamingJobProgressListener
>>     +
>>     +@Produces(Array(MediaType.APPLICATION_JSON))
>>     +private[v1] class StreamingInfoResource(uiRoot: UIRoot,
>>     listener: StreamingJobProgressListener){
>>     +
>>     +  @GET
>>     +  def streamingInfo()
>>     +  :Iterator[StreamingInfo]={
>>     +    var v = listener.numTotalCompletedBatches
>>     +    Iterator(new StreamingInfo("testname",v))
>>     +
>>     +  }
>>     +}
>>     \ No newline at end of file
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>>     new file mode 100644
>>     index 0000000..958dd41
>>     --- /dev/null
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>>     @@ -0,0 +1,6 @@
>>     +package org.apache.spark.streaming.status.api.v1
>>     +
>>     +class StreamingInfo private[streaming](
>>     +    val name:String,
>>     +    val completedBatchCount:Long)
>>     +
>>     \ No newline at end of file
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>>     index bc53f2a..877abf4 100644
>>     ---
>>     a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>>     @@ -22,6 +22,7 @@ import org.apache.spark.streaming.StreamingContext
>>      import org.apache.spark.ui.{SparkUI, SparkUITab}
>>
>>      import StreamingTab._
>>     +import
>>     org.apache.spark.streaming.status.api.v1.StreamingApiRootResource
>>
>>      /**
>>       * Spark Web UI tab that shows statistics of a streaming job.
>>     @@ -39,6 +40,9 @@ private[spark] class StreamingTab(val ssc:
>>     StreamingContext)
>>        ssc.sc.addSparkListener(listener)
>>        attachPage(new StreamingPage(this))
>>        attachPage(new BatchPage(this))
>>     +
>>     +  //register streaming api
>>     +
>>     parent.attachHandler(StreamingApiRootResource.getServletHandler(parent,listener));
>>
>>        def attach() {
>>          getSparkUI(ssc).attachTab(this)
>>
>>
>>     On 9/14/16 10:13 AM, Chan Chor Pang wrote:
>>
>>         Hi everyone,
>>
>>         Trying to monitoring our streaming application using Spark
>>         REST interface
>>         only to found that there is no such thing for Streaming.
>>
>>         I wonder if anyone already working on this or I should just
>>         start implementing my own one?
>>
>>
>>
>>     ---------------------------------------------------------------------
>>     To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>     <ma...@spark.apache.org>
>>
>>
>

-- 
---*------------------------------------------------*---*---*---*---
\u682a\u5f0f\u4f1a\u793eINDETAIL
\u30cb\u30a2\u30b7\u30e7\u30a2\u7dcf\u5408\u30b5\u30fc\u30d3\u30b9\u4e8b\u696d\u672c\u90e8
\u30b2\u30fc\u30e0\u30b5\u30fc\u30d3\u30b9\u4e8b\u696d\u90e8
\u9673\u3000\u695a\u9d6c
E-mail :chin-sh@indetail.co.jp
URL : http://www.indetail.co.jp

\u3010\u672d\u5e4c\u672c\u793e\uff0fLABO\uff0fLABO2\u3011
\u3012060-0042
\u672d\u5e4c\u5e02\u4e2d\u592e\u533a\u5927\u901a\u897f9\u4e01\u76ee3\u756a\u573033
\u30ad\u30bf\u30b3\u30fc\u30bb\u30f3\u30bf\u30fc\u30d3\u30eb\u30c7\u30a3\u30f3\u30b0
\uff08\u672d\u5e4c\u672c\u793e\uff0fLABO2\uff1a2\u968e\u3001LABO\uff1a9\u968e\uff09
TEL\uff1a011-206-9235 FAX\uff1a011-206-9236

\u3010\u6771\u4eac\u652f\u5e97\u3011
\u3012108-0014
\u6771\u4eac\u90fd\u6e2f\u533a\u829d5\u4e01\u76ee29\u756a20\u53f7 \u30af\u30ed\u30b9\u30aa\u30d5\u30a3\u30b9\u4e09\u7530
TEL\uff1a03-6809-6502 FAX\uff1a03-6809-6504

\u3010\u540d\u53e4\u5c4b\u30b5\u30c6\u30e9\u30a4\u30c8\u3011
\u3012460-0002
\u611b\u77e5\u770c\u540d\u53e4\u5c4b\u5e02\u4e2d\u533a\u4e38\u306e\u51853\u4e01\u76ee17\u756a24\u53f7 NAYUTA BLD
TEL\uff1a052-971-0086


Re: REST api for monitoring Spark Streaming

Posted by Chan Chor Pang <ch...@indetail.co.jp>.
Thank you

this should take me at least a few days, and will let you know as soon 
as the PR ready.


On 11/8/16 11:44 AM, Tathagata Das wrote:
> This may be a good addition. I suggest you read our guidelines on 
> contributing code to Spark.
>
> https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-PreparingtoContributeCodeChanges
>
> Its long document but it should have everything for you to figure out 
> how to contribute your changes. I hope to see your changes in a Github 
> PR soon!
>
> TD
>
> On Mon, Nov 7, 2016 at 5:30 PM, Chan Chor Pang <chin-sh@indetail.co.jp 
> <ma...@indetail.co.jp>> wrote:
>
>     hi everyone
>
>     it seems that there is not much who interested in creating a api
>     for Streaming.
>     never the less I still really want the api for monitoring.
>     so i tried to see if i can implement by my own.
>
>     after some test,
>     i believe i can achieve the goal by
>     1. implement a package(org.apache.spark.streaming.status.api.v1)
>     that serve the same purpose as org.apache.spark.status.api.v1
>     2. register the api path through StreamingTab
>     and 3. retrive the streaming informateion through
>     StreamingJobProgressListener
>
>     what my most concern now is will my implementation be able to
>     merge to the main stream.
>
>     im new to open source project, so anyone could please show me some
>     light?
>     how should/could i proceed to make my implementation to be able to
>     merge to the main stream.
>
>
>     here is my test code base on v1.6.0
>     ###################################
>     diff --git
>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>     new file mode 100644
>     index 0000000..690e2d8
>     --- /dev/null
>     +++
>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>     @@ -0,0 +1,68 @@
>     +package org.apache.spark.streaming.status.api.v1
>     +
>     +import java.io.OutputStream
>     +import java.lang.annotation.Annotation
>     +import java.lang.reflect.Type
>     +import java.text.SimpleDateFormat
>     +import java.util.{Calendar, SimpleTimeZone}
>     +import javax.ws.rs.Produces
>     +import javax.ws.rs.core.{MediaType, MultivaluedMap}
>     +import javax.ws.rs.ext.{MessageBodyWriter, Provider}
>     +
>     +import com.fasterxml.jackson.annotation.JsonInclude
>     +import com.fasterxml.jackson.databind.{ObjectMapper,
>     SerializationFeature}
>     +
>     +@Provider
>     +@Produces(Array(MediaType.APPLICATION_JSON))
>     +private[v1] class JacksonMessageWriter extends
>     MessageBodyWriter[Object]{
>     +
>     +  val mapper = new ObjectMapper() {
>     +    override def writeValueAsString(t: Any): String = {
>     +      super.writeValueAsString(t)
>     +    }
>     +  }
>     +
>     mapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
>     +  mapper.enable(SerializationFeature.INDENT_OUTPUT)
>     +  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
>     +  mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat)
>     +
>     +  override def isWriteable(
>     +      aClass: Class[_],
>     +      `type`: Type,
>     +      annotations: Array[Annotation],
>     +      mediaType: MediaType): Boolean = {
>     +      true
>     +  }
>     +
>     +  override def writeTo(
>     +      t: Object,
>     +      aClass: Class[_],
>     +      `type`: Type,
>     +      annotations: Array[Annotation],
>     +      mediaType: MediaType,
>     +      multivaluedMap: MultivaluedMap[String, AnyRef],
>     +      outputStream: OutputStream): Unit = {
>     +    t match {
>     +      //case ErrorWrapper(err) =>
>     outputStream.write(err.getBytes("utf-8"))
>     +      case _ => mapper.writeValue(outputStream, t)
>     +    }
>     +  }
>     +
>     +  override def getSize(
>     +      t: Object,
>     +      aClass: Class[_],
>     +      `type`: Type,
>     +      annotations: Array[Annotation],
>     +      mediaType: MediaType): Long = {
>     +    -1L
>     +  }
>     +}
>     +
>     +private[spark] object JacksonMessageWriter {
>     +  def makeISODateFormat: SimpleDateFormat = {
>     +    val iso8601 = new
>     SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'")
>     +    val cal = Calendar.getInstance(new SimpleTimeZone(0, "GMT"))
>     +    iso8601.setCalendar(cal)
>     +    iso8601
>     +  }
>     +}
>     diff --git
>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>     new file mode 100644
>     index 0000000..f4e43dd
>     --- /dev/null
>     +++
>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>     @@ -0,0 +1,74 @@
>     +package org.apache.spark.streaming.status.api.v1
>     +
>     +import org.apache.spark.status.api.v1.UIRoot
>     +import org.eclipse.jetty.server.handler.ContextHandler
>     +import org.eclipse.jetty.servlet.ServletContextHandler
>     +import org.eclipse.jetty.servlet.ServletHolder
>     +
>     +import com.sun.jersey.spi.container.servlet.ServletContainer
>     +
>     +import javax.servlet.ServletContext
>     +import javax.ws.rs.Path
>     +import javax.ws.rs.Produces
>     +import javax.ws.rs.core.Context
>     +import org.apache.spark.streaming.ui.StreamingJobProgressListener
>     +
>     +
>     +@Path("/v1")
>     +private[v1] class StreamingApiRootResource extends
>     UIRootFromServletContext{
>     +
>     +  @Path("streaminginfo")
>     +  def getStreamingInfo(): StreamingInfoResource = {
>     +    new StreamingInfoResource(uiRoot,listener)
>     +  }
>     +
>     +}
>     +
>     +private[spark] object StreamingApiRootResource {
>     +
>     +  def getServletHandler(uiRoot: UIRoot,
>     listener:StreamingJobProgressListener): ServletContextHandler = {
>     +
>     +    val jerseyContext = new
>     ServletContextHandler(ServletContextHandler.NO_SESSIONS)
>     +    jerseyContext.setContextPath("/streamingapi")
>     +    val holder: ServletHolder = new
>     ServletHolder(classOf[ServletContainer])
>     +
>     holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
>     +      "com.sun.jersey.api.core.PackagesResourceConfig")
>     + holder.setInitParameter("com.sun.jersey.config.property.packages",
>     +      "org.apache.spark.streaming.st
>     <http://org.apache.spark.streaming.st>atus.api.v1")
>     +
>     //holder.setInitParameter(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS,
>     +    //  classOf[SecurityFilter].getCanonicalName)
>     +    UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot)
>     +    UIRootFromServletContext.setListener(jerseyContext, listener)
>     +    jerseyContext.addServlet(holder, "/*")
>     +    jerseyContext
>     +  }
>     +}
>     +
>     +private[v1] object UIRootFromServletContext {
>     +
>     +  private val attribute = getClass.getCanonicalName
>     +
>     +  def setListener(contextHandler:ContextHandler, listener:
>     StreamingJobProgressListener):Unit={
>     +   contextHandler.setAttribute(attribute+"_listener", listener)
>     +  }
>     +
>     +  def
>     getListener(context:ServletContext):StreamingJobProgressListener={
>     +
>     context.getAttribute(attribute+"_listener").asInstanceOf[StreamingJobProgressListener]
>     +  }
>     +
>     +  def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot):
>     Unit = {
>     +    contextHandler.setAttribute(attribute, uiRoot)
>     +  }
>     +
>     +  def getUiRoot(context: ServletContext): UIRoot = {
>     +    context.getAttribute(attribute).asInstanceOf[UIRoot]
>     +  }
>     +}
>     +
>     +private[v1] trait UIRootFromServletContext {
>     +  @Context
>     +  var servletContext: ServletContext = _
>     +
>     +  def uiRoot: UIRoot =
>     UIRootFromServletContext.getUiRoot(servletContext)
>     +  def listener: StreamingJobProgressListener =
>     UIRootFromServletContext.getListener(servletContext)
>     +}
>     diff --git
>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>     new file mode 100644
>     index 0000000..d5fc11b
>     --- /dev/null
>     +++
>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>     @@ -0,0 +1,22 @@
>     +package org.apache.spark.streaming.status.api.v1
>     +
>     +import org.apache.spark.status.api.v1.SimpleDateParam
>     +import org.apache.spark.status.api.v1.UIRoot
>     +
>     +import javax.ws.rs.GET
>     +import javax.ws.rs.Produces
>     +import javax.ws.rs.core.MediaType
>     +import org.apache.spark.streaming.StreamingContext
>     +import org.apache.spark.streaming.ui.StreamingJobProgressListener
>     +
>     +@Produces(Array(MediaType.APPLICATION_JSON))
>     +private[v1] class StreamingInfoResource(uiRoot: UIRoot, listener:
>     StreamingJobProgressListener){
>     +
>     +  @GET
>     +  def streamingInfo()
>     +  :Iterator[StreamingInfo]={
>     +    var v = listener.numTotalCompletedBatches
>     +    Iterator(new StreamingInfo("testname",v))
>     +
>     +  }
>     +}
>     \ No newline at end of file
>     diff --git
>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>     new file mode 100644
>     index 0000000..958dd41
>     --- /dev/null
>     +++
>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>     @@ -0,0 +1,6 @@
>     +package org.apache.spark.streaming.status.api.v1
>     +
>     +class StreamingInfo private[streaming](
>     +    val name:String,
>     +    val completedBatchCount:Long)
>     +
>     \ No newline at end of file
>     diff --git
>     a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>     b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>     index bc53f2a..877abf4 100644
>     ---
>     a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>     +++
>     b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>     @@ -22,6 +22,7 @@ import org.apache.spark.streaming.StreamingContext
>      import org.apache.spark.ui.{SparkUI, SparkUITab}
>
>      import StreamingTab._
>     +import
>     org.apache.spark.streaming.status.api.v1.StreamingApiRootResource
>
>      /**
>       * Spark Web UI tab that shows statistics of a streaming job.
>     @@ -39,6 +40,9 @@ private[spark] class StreamingTab(val ssc:
>     StreamingContext)
>        ssc.sc.addSparkListener(listener)
>        attachPage(new StreamingPage(this))
>        attachPage(new BatchPage(this))
>     +
>     +  //register streaming api
>     +
>     parent.attachHandler(StreamingApiRootResource.getServletHandler(parent,listener));
>
>        def attach() {
>          getSparkUI(ssc).attachTab(this)
>
>
>     On 9/14/16 10:13 AM, Chan Chor Pang wrote:
>
>         Hi everyone,
>
>         Trying to monitoring our streaming application using Spark
>         REST interface
>         only to found that there is no such thing for Streaming.
>
>         I wonder if anyone already working on this or I should just
>         start implementing my own one?
>
>
>
>     ---------------------------------------------------------------------
>     To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>     <ma...@spark.apache.org>
>
>


Re: REST api for monitoring Spark Streaming

Posted by Tathagata Das <ta...@gmail.com>.
This may be a good addition. I suggest you read our guidelines on
contributing code to Spark.

https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-PreparingtoContributeCodeChanges

Its long document but it should have everything for you to figure out how
to contribute your changes. I hope to see your changes in a Github PR soon!

TD

On Mon, Nov 7, 2016 at 5:30 PM, Chan Chor Pang <ch...@indetail.co.jp>
wrote:

> hi everyone
>
> it seems that there is not much who interested in creating a api for
> Streaming.
> never the less I still really want the api for monitoring.
> so i tried to see if i can implement by my own.
>
> after some test,
> i believe i can achieve the goal by
> 1. implement a package(org.apache.spark.streaming.status.api.v1) that
> serve the same purpose as org.apache.spark.status.api.v1
> 2. register the api path through StreamingTab
> and 3. retrive the streaming informateion through
> StreamingJobProgressListener
>
> what my most concern now is will my implementation be able to merge to the
> main stream.
>
> im new to open source project, so anyone could please show me some light?
> how should/could i proceed to make my implementation to be able to merge
> to the main stream.
>
>
> here is my test code base on v1.6.0
> ###################################
> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/JacksonMessageWriter.scala b/streaming/src/main/scala/org
> /apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
> new file mode 100644
> index 0000000..690e2d8
> --- /dev/null
> +++ b/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/JacksonMessageWriter.scala
> @@ -0,0 +1,68 @@
> +package org.apache.spark.streaming.status.api.v1
> +
> +import java.io.OutputStream
> +import java.lang.annotation.Annotation
> +import java.lang.reflect.Type
> +import java.text.SimpleDateFormat
> +import java.util.{Calendar, SimpleTimeZone}
> +import javax.ws.rs.Produces
> +import javax.ws.rs.core.{MediaType, MultivaluedMap}
> +import javax.ws.rs.ext.{MessageBodyWriter, Provider}
> +
> +import com.fasterxml.jackson.annotation.JsonInclude
> +import com.fasterxml.jackson.databind.{ObjectMapper,
> SerializationFeature}
> +
> +@Provider
> +@Produces(Array(MediaType.APPLICATION_JSON))
> +private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{
> +
> +  val mapper = new ObjectMapper() {
> +    override def writeValueAsString(t: Any): String = {
> +      super.writeValueAsString(t)
> +    }
> +  }
> + mapper.registerModule(com.fasterxml.jackson.module.scala.
> DefaultScalaModule)
> +  mapper.enable(SerializationFeature.INDENT_OUTPUT)
> +  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
> +  mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat)
> +
> +  override def isWriteable(
> +      aClass: Class[_],
> +      `type`: Type,
> +      annotations: Array[Annotation],
> +      mediaType: MediaType): Boolean = {
> +      true
> +  }
> +
> +  override def writeTo(
> +      t: Object,
> +      aClass: Class[_],
> +      `type`: Type,
> +      annotations: Array[Annotation],
> +      mediaType: MediaType,
> +      multivaluedMap: MultivaluedMap[String, AnyRef],
> +      outputStream: OutputStream): Unit = {
> +    t match {
> +      //case ErrorWrapper(err) => outputStream.write(err.getByte
> s("utf-8"))
> +      case _ => mapper.writeValue(outputStream, t)
> +    }
> +  }
> +
> +  override def getSize(
> +      t: Object,
> +      aClass: Class[_],
> +      `type`: Type,
> +      annotations: Array[Annotation],
> +      mediaType: MediaType): Long = {
> +    -1L
> +  }
> +}
> +
> +private[spark] object JacksonMessageWriter {
> +  def makeISODateFormat: SimpleDateFormat = {
> +    val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'")
> +    val cal = Calendar.getInstance(new SimpleTimeZone(0, "GMT"))
> +    iso8601.setCalendar(cal)
> +    iso8601
> +  }
> +}
> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/StreamingApiRootResource.scala b/streaming/src/main/scala/org
> /apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
> new file mode 100644
> index 0000000..f4e43dd
> --- /dev/null
> +++ b/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/StreamingApiRootResource.scala
> @@ -0,0 +1,74 @@
> +package org.apache.spark.streaming.status.api.v1
> +
> +import org.apache.spark.status.api.v1.UIRoot
> +import org.eclipse.jetty.server.handler.ContextHandler
> +import org.eclipse.jetty.servlet.ServletContextHandler
> +import org.eclipse.jetty.servlet.ServletHolder
> +
> +import com.sun.jersey.spi.container.servlet.ServletContainer
> +
> +import javax.servlet.ServletContext
> +import javax.ws.rs.Path
> +import javax.ws.rs.Produces
> +import javax.ws.rs.core.Context
> +import org.apache.spark.streaming.ui.StreamingJobProgressListener
> +
> +
> +@Path("/v1")
> +private[v1] class StreamingApiRootResource extends
> UIRootFromServletContext{
> +
> +  @Path("streaminginfo")
> +  def getStreamingInfo(): StreamingInfoResource = {
> +    new StreamingInfoResource(uiRoot,listener)
> +  }
> +
> +}
> +
> +private[spark] object StreamingApiRootResource {
> +
> +  def getServletHandler(uiRoot: UIRoot, listener:StreamingJobProgressListener):
> ServletContextHandler = {
> +
> +    val jerseyContext = new ServletContextHandler(ServletC
> ontextHandler.NO_SESSIONS)
> +    jerseyContext.setContextPath("/streamingapi")
> +    val holder: ServletHolder = new ServletHolder(classOf[ServletC
> ontainer])
> + holder.setInitParameter("com.sun.jersey.config.property.reso
> urceConfigClass",
> +      "com.sun.jersey.api.core.PackagesResourceConfig")
> + holder.setInitParameter("com.sun.jersey.config.property.packages",
> +      "org.apache.spark.streaming.status.api.v1")
> + //holder.setInitParameter(ResourceConfig.PROPERTY_CONTAINER_
> REQUEST_FILTERS,
> +    //  classOf[SecurityFilter].getCanonicalName)
> +    UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot)
> +    UIRootFromServletContext.setListener(jerseyContext, listener)
> +    jerseyContext.addServlet(holder, "/*")
> +    jerseyContext
> +  }
> +}
> +
> +private[v1] object UIRootFromServletContext {
> +
> +  private val attribute = getClass.getCanonicalName
> +
> +  def setListener(contextHandler:ContextHandler, listener:
> StreamingJobProgressListener):Unit={
> +   contextHandler.setAttribute(attribute+"_listener", listener)
> +  }
> +
> +  def getListener(context:ServletContext):StreamingJobProgressListener={
> + context.getAttribute(attribute+"_listener").asInstanceOf[Str
> eamingJobProgressListener]
> +  }
> +
> +  def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot): Unit = {
> +    contextHandler.setAttribute(attribute, uiRoot)
> +  }
> +
> +  def getUiRoot(context: ServletContext): UIRoot = {
> +    context.getAttribute(attribute).asInstanceOf[UIRoot]
> +  }
> +}
> +
> +private[v1] trait UIRootFromServletContext {
> +  @Context
> +  var servletContext: ServletContext = _
> +
> +  def uiRoot: UIRoot = UIRootFromServletContext.getUiRoot(servletContext)
> +  def listener: StreamingJobProgressListener =
> UIRootFromServletContext.getListener(servletContext)
> +}
> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/StreamingInfoResource.scala b/streaming/src/main/scala/org
> /apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
> new file mode 100644
> index 0000000..d5fc11b
> --- /dev/null
> +++ b/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/StreamingInfoResource.scala
> @@ -0,0 +1,22 @@
> +package org.apache.spark.streaming.status.api.v1
> +
> +import org.apache.spark.status.api.v1.SimpleDateParam
> +import org.apache.spark.status.api.v1.UIRoot
> +
> +import javax.ws.rs.GET
> +import javax.ws.rs.Produces
> +import javax.ws.rs.core.MediaType
> +import org.apache.spark.streaming.StreamingContext
> +import org.apache.spark.streaming.ui.StreamingJobProgressListener
> +
> +@Produces(Array(MediaType.APPLICATION_JSON))
> +private[v1] class StreamingInfoResource(uiRoot: UIRoot, listener:
> StreamingJobProgressListener){
> +
> +  @GET
> +  def streamingInfo()
> +  :Iterator[StreamingInfo]={
> +    var v = listener.numTotalCompletedBatches
> +    Iterator(new StreamingInfo("testname",v))
> +
> +  }
> +}
> \ No newline at end of file
> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
> b/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/api.scala
> new file mode 100644
> index 0000000..958dd41
> --- /dev/null
> +++ b/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/api.scala
> @@ -0,0 +1,6 @@
> +package org.apache.spark.streaming.status.api.v1
> +
> +class StreamingInfo private[streaming](
> +    val name:String,
> +    val completedBatchCount:Long)
> +
> \ No newline at end of file
> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
> b/streaming/src/main/scala/org/apache/spark/streaming/ui/Str
> eamingTab.scala
> index bc53f2a..877abf4 100644
> --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/Str
> eamingTab.scala
> +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/Str
> eamingTab.scala
> @@ -22,6 +22,7 @@ import org.apache.spark.streaming.StreamingContext
>  import org.apache.spark.ui.{SparkUI, SparkUITab}
>
>  import StreamingTab._
> +import org.apache.spark.streaming.status.api.v1.StreamingApiRootResource
>
>  /**
>   * Spark Web UI tab that shows statistics of a streaming job.
> @@ -39,6 +40,9 @@ private[spark] class StreamingTab(val ssc:
> StreamingContext)
>    ssc.sc.addSparkListener(listener)
>    attachPage(new StreamingPage(this))
>    attachPage(new BatchPage(this))
> +
> +  //register streaming api
> + parent.attachHandler(StreamingApiRootResource.getServletHand
> ler(parent,listener));
>
>    def attach() {
>      getSparkUI(ssc).attachTab(this)
>
>
> On 9/14/16 10:13 AM, Chan Chor Pang wrote:
>
>> Hi everyone,
>>
>> Trying to monitoring our streaming application using Spark REST interface
>> only to found that there is no such thing for Streaming.
>>
>> I wonder if anyone already working on this or I should just start
>> implementing my own one?
>>
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>

Re: REST api for monitoring Spark Streaming

Posted by Chan Chor Pang <ch...@indetail.co.jp>.
hi everyone

it seems that there is not much who interested in creating a api for 
Streaming.
never the less I still really want the api for monitoring.
so i tried to see if i can implement by my own.

after some test,
i believe i can achieve the goal by
1. implement a package(org.apache.spark.streaming.status.api.v1) that 
serve the same purpose as org.apache.spark.status.api.v1
2. register the api path through StreamingTab
and 3. retrive the streaming informateion through 
StreamingJobProgressListener

what my most concern now is will my implementation be able to merge to 
the main stream.

im new to open source project, so anyone could please show me some light?
how should/could i proceed to make my implementation to be able to merge 
to the main stream.


here is my test code base on v1.6.0
###################################
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
new file mode 100644
index 0000000..690e2d8
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
@@ -0,0 +1,68 @@
+package org.apache.spark.streaming.status.api.v1
+
+import java.io.OutputStream
+import java.lang.annotation.Annotation
+import java.lang.reflect.Type
+import java.text.SimpleDateFormat
+import java.util.{Calendar, SimpleTimeZone}
+import javax.ws.rs.Produces
+import javax.ws.rs.core.{MediaType, MultivaluedMap}
+import javax.ws.rs.ext.{MessageBodyWriter, Provider}
+
+import com.fasterxml.jackson.annotation.JsonInclude
+import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
+
+@Provider
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{
+
+  val mapper = new ObjectMapper() {
+    override def writeValueAsString(t: Any): String = {
+      super.writeValueAsString(t)
+    }
+  }
+ 
mapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
+  mapper.enable(SerializationFeature.INDENT_OUTPUT)
+  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
+  mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat)
+
+  override def isWriteable(
+      aClass: Class[_],
+      `type`: Type,
+      annotations: Array[Annotation],
+      mediaType: MediaType): Boolean = {
+      true
+  }
+
+  override def writeTo(
+      t: Object,
+      aClass: Class[_],
+      `type`: Type,
+      annotations: Array[Annotation],
+      mediaType: MediaType,
+      multivaluedMap: MultivaluedMap[String, AnyRef],
+      outputStream: OutputStream): Unit = {
+    t match {
+      //case ErrorWrapper(err) => outputStream.write(err.getBytes("utf-8"))
+      case _ => mapper.writeValue(outputStream, t)
+    }
+  }
+
+  override def getSize(
+      t: Object,
+      aClass: Class[_],
+      `type`: Type,
+      annotations: Array[Annotation],
+      mediaType: MediaType): Long = {
+    -1L
+  }
+}
+
+private[spark] object JacksonMessageWriter {
+  def makeISODateFormat: SimpleDateFormat = {
+    val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'")
+    val cal = Calendar.getInstance(new SimpleTimeZone(0, "GMT"))
+    iso8601.setCalendar(cal)
+    iso8601
+  }
+}
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
new file mode 100644
index 0000000..f4e43dd
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
@@ -0,0 +1,74 @@
+package org.apache.spark.streaming.status.api.v1
+
+import org.apache.spark.status.api.v1.UIRoot
+import org.eclipse.jetty.server.handler.ContextHandler
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.eclipse.jetty.servlet.ServletHolder
+
+import com.sun.jersey.spi.container.servlet.ServletContainer
+
+import javax.servlet.ServletContext
+import javax.ws.rs.Path
+import javax.ws.rs.Produces
+import javax.ws.rs.core.Context
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+
+@Path("/v1")
+private[v1] class StreamingApiRootResource extends 
UIRootFromServletContext{
+
+  @Path("streaminginfo")
+  def getStreamingInfo(): StreamingInfoResource = {
+    new StreamingInfoResource(uiRoot,listener)
+  }
+
+}
+
+private[spark] object StreamingApiRootResource {
+
+  def getServletHandler(uiRoot: UIRoot, 
listener:StreamingJobProgressListener): ServletContextHandler = {
+
+    val jerseyContext = new 
ServletContextHandler(ServletContextHandler.NO_SESSIONS)
+    jerseyContext.setContextPath("/streamingapi")
+    val holder: ServletHolder = new 
ServletHolder(classOf[ServletContainer])
+ 
holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
+      "com.sun.jersey.api.core.PackagesResourceConfig")
+ holder.setInitParameter("com.sun.jersey.config.property.packages",
+      "org.apache.spark.streaming.status.api.v1")
+ 
//holder.setInitParameter(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS,
+    //  classOf[SecurityFilter].getCanonicalName)
+    UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot)
+    UIRootFromServletContext.setListener(jerseyContext, listener)
+    jerseyContext.addServlet(holder, "/*")
+    jerseyContext
+  }
+}
+
+private[v1] object UIRootFromServletContext {
+
+  private val attribute = getClass.getCanonicalName
+
+  def setListener(contextHandler:ContextHandler, listener: 
StreamingJobProgressListener):Unit={
+   contextHandler.setAttribute(attribute+"_listener", listener)
+  }
+
+  def getListener(context:ServletContext):StreamingJobProgressListener={
+ 
context.getAttribute(attribute+"_listener").asInstanceOf[StreamingJobProgressListener]
+  }
+
+  def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot): Unit = {
+    contextHandler.setAttribute(attribute, uiRoot)
+  }
+
+  def getUiRoot(context: ServletContext): UIRoot = {
+    context.getAttribute(attribute).asInstanceOf[UIRoot]
+  }
+}
+
+private[v1] trait UIRootFromServletContext {
+  @Context
+  var servletContext: ServletContext = _
+
+  def uiRoot: UIRoot = UIRootFromServletContext.getUiRoot(servletContext)
+  def listener: StreamingJobProgressListener = 
UIRootFromServletContext.getListener(servletContext)
+}
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
new file mode 100644
index 0000000..d5fc11b
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
@@ -0,0 +1,22 @@
+package org.apache.spark.streaming.status.api.v1
+
+import org.apache.spark.status.api.v1.SimpleDateParam
+import org.apache.spark.status.api.v1.UIRoot
+
+import javax.ws.rs.GET
+import javax.ws.rs.Produces
+import javax.ws.rs.core.MediaType
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class StreamingInfoResource(uiRoot: UIRoot, listener: 
StreamingJobProgressListener){
+
+  @GET
+  def streamingInfo()
+  :Iterator[StreamingInfo]={
+    var v = listener.numTotalCompletedBatches
+    Iterator(new StreamingInfo("testname",v))
+
+  }
+}
\ No newline at end of file
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
new file mode 100644
index 0000000..958dd41
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
@@ -0,0 +1,6 @@
+package org.apache.spark.streaming.status.api.v1
+
+class StreamingInfo private[streaming](
+    val name:String,
+    val completedBatchCount:Long)
+
\ No newline at end of file
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index bc53f2a..877abf4 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -22,6 +22,7 @@ import org.apache.spark.streaming.StreamingContext
  import org.apache.spark.ui.{SparkUI, SparkUITab}

  import StreamingTab._
+import org.apache.spark.streaming.status.api.v1.StreamingApiRootResource

  /**
   * Spark Web UI tab that shows statistics of a streaming job.
@@ -39,6 +40,9 @@ private[spark] class StreamingTab(val ssc: 
StreamingContext)
    ssc.sc.addSparkListener(listener)
    attachPage(new StreamingPage(this))
    attachPage(new BatchPage(this))
+
+  //register streaming api
+ 
parent.attachHandler(StreamingApiRootResource.getServletHandler(parent,listener));

    def attach() {
      getSparkUI(ssc).attachTab(this)


On 9/14/16 10:13 AM, Chan Chor Pang wrote:
> Hi everyone,
>
> Trying to monitoring our streaming application using Spark REST interface
> only to found that there is no such thing for Streaming.
>
> I wonder if anyone already working on this or I should just start 
> implementing my own one? 


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org