You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Chan Chor Pang <ch...@indetail.co.jp> on 2016/12/02 02:19:02 UTC
Re: REST api for monitoring Spark Streaming
hi everyone
I have done the coding and create the PR
the implementation is straightforward and similar to the api in spark-core
but we still need someone with streaming background to verify the patch
just to make sure everything is OK
so, please anyone can help?
https://github.com/apache/spark/pull/16000
On 11/8/16 1:46 PM, Chan Chor Pang wrote:
>
> Thank you
>
> this should take me at least a few days, and will let you know as soon
> as the PR ready.
>
>
> On 11/8/16 11:44 AM, Tathagata Das wrote:
>> This may be a good addition. I suggest you read our guidelines on
>> contributing code to Spark.
>>
>> https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-PreparingtoContributeCodeChanges
>>
>> Its long document but it should have everything for you to figure out
>> how to contribute your changes. I hope to see your changes in a
>> Github PR soon!
>>
>> TD
>>
>> On Mon, Nov 7, 2016 at 5:30 PM, Chan Chor Pang
>> <chin-sh@indetail.co.jp <ma...@indetail.co.jp>> wrote:
>>
>> hi everyone
>>
>> it seems that there is not much who interested in creating a api
>> for Streaming.
>> never the less I still really want the api for monitoring.
>> so i tried to see if i can implement by my own.
>>
>> after some test,
>> i believe i can achieve the goal by
>> 1. implement a package(org.apache.spark.streaming.status.api.v1)
>> that serve the same purpose as org.apache.spark.status.api.v1
>> 2. register the api path through StreamingTab
>> and 3. retrive the streaming informateion through
>> StreamingJobProgressListener
>>
>> what my most concern now is will my implementation be able to
>> merge to the main stream.
>>
>> im new to open source project, so anyone could please show me
>> some light?
>> how should/could i proceed to make my implementation to be able
>> to merge to the main stream.
>>
>>
>> here is my test code base on v1.6.0
>> ###################################
>> diff --git
>> a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>> b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>> new file mode 100644
>> index 0000000..690e2d8
>> --- /dev/null
>> +++
>> b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>> @@ -0,0 +1,68 @@
>> +package org.apache.spark.streaming.status.api.v1
>> +
>> +import java.io.OutputStream
>> +import java.lang.annotation.Annotation
>> +import java.lang.reflect.Type
>> +import java.text.SimpleDateFormat
>> +import java.util.{Calendar, SimpleTimeZone}
>> +import javax.ws.rs.Produces
>> +import javax.ws.rs.core.{MediaType, MultivaluedMap}
>> +import javax.ws.rs.ext.{MessageBodyWriter, Provider}
>> +
>> +import com.fasterxml.jackson.annotation.JsonInclude
>> +import com.fasterxml.jackson.databind.{ObjectMapper,
>> SerializationFeature}
>> +
>> +@Provider
>> +@Produces(Array(MediaType.APPLICATION_JSON))
>> +private[v1] class JacksonMessageWriter extends
>> MessageBodyWriter[Object]{
>> +
>> + val mapper = new ObjectMapper() {
>> + override def writeValueAsString(t: Any): String = {
>> + super.writeValueAsString(t)
>> + }
>> + }
>> +
>> mapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
>> + mapper.enable(SerializationFeature.INDENT_OUTPUT)
>> + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
>> + mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat)
>> +
>> + override def isWriteable(
>> + aClass: Class[_],
>> + `type`: Type,
>> + annotations: Array[Annotation],
>> + mediaType: MediaType): Boolean = {
>> + true
>> + }
>> +
>> + override def writeTo(
>> + t: Object,
>> + aClass: Class[_],
>> + `type`: Type,
>> + annotations: Array[Annotation],
>> + mediaType: MediaType,
>> + multivaluedMap: MultivaluedMap[String, AnyRef],
>> + outputStream: OutputStream): Unit = {
>> + t match {
>> + //case ErrorWrapper(err) =>
>> outputStream.write(err.getBytes("utf-8"))
>> + case _ => mapper.writeValue(outputStream, t)
>> + }
>> + }
>> +
>> + override def getSize(
>> + t: Object,
>> + aClass: Class[_],
>> + `type`: Type,
>> + annotations: Array[Annotation],
>> + mediaType: MediaType): Long = {
>> + -1L
>> + }
>> +}
>> +
>> +private[spark] object JacksonMessageWriter {
>> + def makeISODateFormat: SimpleDateFormat = {
>> + val iso8601 = new
>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'")
>> + val cal = Calendar.getInstance(new SimpleTimeZone(0, "GMT"))
>> + iso8601.setCalendar(cal)
>> + iso8601
>> + }
>> +}
>> diff --git
>> a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>> b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>> new file mode 100644
>> index 0000000..f4e43dd
>> --- /dev/null
>> +++
>> b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>> @@ -0,0 +1,74 @@
>> +package org.apache.spark.streaming.status.api.v1
>> +
>> +import org.apache.spark.status.api.v1.UIRoot
>> +import org.eclipse.jetty.server.handler.ContextHandler
>> +import org.eclipse.jetty.servlet.ServletContextHandler
>> +import org.eclipse.jetty.servlet.ServletHolder
>> +
>> +import com.sun.jersey.spi.container.servlet.ServletContainer
>> +
>> +import javax.servlet.ServletContext
>> +import javax.ws.rs.Path
>> +import javax.ws.rs.Produces
>> +import javax.ws.rs.core.Context
>> +import org.apache.spark.streaming.ui.StreamingJobProgressListener
>> +
>> +
>> +@Path("/v1")
>> +private[v1] class StreamingApiRootResource extends
>> UIRootFromServletContext{
>> +
>> + @Path("streaminginfo")
>> + def getStreamingInfo(): StreamingInfoResource = {
>> + new StreamingInfoResource(uiRoot,listener)
>> + }
>> +
>> +}
>> +
>> +private[spark] object StreamingApiRootResource {
>> +
>> + def getServletHandler(uiRoot: UIRoot,
>> listener:StreamingJobProgressListener): ServletContextHandler = {
>> +
>> + val jerseyContext = new
>> ServletContextHandler(ServletContextHandler.NO_SESSIONS)
>> + jerseyContext.setContextPath("/streamingapi")
>> + val holder: ServletHolder = new
>> ServletHolder(classOf[ServletContainer])
>> +
>> holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
>> + "com.sun.jersey.api.core.PackagesResourceConfig")
>> + holder.setInitParameter("com.sun.jersey.config.property.packages",
>> + "org.apache.spark.streaming.st
>> <http://org.apache.spark.streaming.st>atus.api.v1")
>> +
>> //holder.setInitParameter(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS,
>> + // classOf[SecurityFilter].getCanonicalName)
>> + UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot)
>> + UIRootFromServletContext.setListener(jerseyContext, listener)
>> + jerseyContext.addServlet(holder, "/*")
>> + jerseyContext
>> + }
>> +}
>> +
>> +private[v1] object UIRootFromServletContext {
>> +
>> + private val attribute = getClass.getCanonicalName
>> +
>> + def setListener(contextHandler:ContextHandler, listener:
>> StreamingJobProgressListener):Unit={
>> + contextHandler.setAttribute(attribute+"_listener", listener)
>> + }
>> +
>> + def
>> getListener(context:ServletContext):StreamingJobProgressListener={
>> +
>> context.getAttribute(attribute+"_listener").asInstanceOf[StreamingJobProgressListener]
>> + }
>> +
>> + def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot):
>> Unit = {
>> + contextHandler.setAttribute(attribute, uiRoot)
>> + }
>> +
>> + def getUiRoot(context: ServletContext): UIRoot = {
>> + context.getAttribute(attribute).asInstanceOf[UIRoot]
>> + }
>> +}
>> +
>> +private[v1] trait UIRootFromServletContext {
>> + @Context
>> + var servletContext: ServletContext = _
>> +
>> + def uiRoot: UIRoot =
>> UIRootFromServletContext.getUiRoot(servletContext)
>> + def listener: StreamingJobProgressListener =
>> UIRootFromServletContext.getListener(servletContext)
>> +}
>> diff --git
>> a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>> b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>> new file mode 100644
>> index 0000000..d5fc11b
>> --- /dev/null
>> +++
>> b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>> @@ -0,0 +1,22 @@
>> +package org.apache.spark.streaming.status.api.v1
>> +
>> +import org.apache.spark.status.api.v1.SimpleDateParam
>> +import org.apache.spark.status.api.v1.UIRoot
>> +
>> +import javax.ws.rs.GET
>> +import javax.ws.rs.Produces
>> +import javax.ws.rs.core.MediaType
>> +import org.apache.spark.streaming.StreamingContext
>> +import org.apache.spark.streaming.ui.StreamingJobProgressListener
>> +
>> +@Produces(Array(MediaType.APPLICATION_JSON))
>> +private[v1] class StreamingInfoResource(uiRoot: UIRoot,
>> listener: StreamingJobProgressListener){
>> +
>> + @GET
>> + def streamingInfo()
>> + :Iterator[StreamingInfo]={
>> + var v = listener.numTotalCompletedBatches
>> + Iterator(new StreamingInfo("testname",v))
>> +
>> + }
>> +}
>> \ No newline at end of file
>> diff --git
>> a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>> b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>> new file mode 100644
>> index 0000000..958dd41
>> --- /dev/null
>> +++
>> b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>> @@ -0,0 +1,6 @@
>> +package org.apache.spark.streaming.status.api.v1
>> +
>> +class StreamingInfo private[streaming](
>> + val name:String,
>> + val completedBatchCount:Long)
>> +
>> \ No newline at end of file
>> diff --git
>> a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>> b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>> index bc53f2a..877abf4 100644
>> ---
>> a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>> +++
>> b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>> @@ -22,6 +22,7 @@ import org.apache.spark.streaming.StreamingContext
>> import org.apache.spark.ui.{SparkUI, SparkUITab}
>>
>> import StreamingTab._
>> +import
>> org.apache.spark.streaming.status.api.v1.StreamingApiRootResource
>>
>> /**
>> * Spark Web UI tab that shows statistics of a streaming job.
>> @@ -39,6 +40,9 @@ private[spark] class StreamingTab(val ssc:
>> StreamingContext)
>> ssc.sc.addSparkListener(listener)
>> attachPage(new StreamingPage(this))
>> attachPage(new BatchPage(this))
>> +
>> + //register streaming api
>> +
>> parent.attachHandler(StreamingApiRootResource.getServletHandler(parent,listener));
>>
>> def attach() {
>> getSparkUI(ssc).attachTab(this)
>>
>>
>> On 9/14/16 10:13 AM, Chan Chor Pang wrote:
>>
>> Hi everyone,
>>
>> Trying to monitoring our streaming application using Spark
>> REST interface
>> only to found that there is no such thing for Streaming.
>>
>> I wonder if anyone already working on this or I should just
>> start implementing my own one?
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>> <ma...@spark.apache.org>
>>
>>
>
--
---*------------------------------------------------*---*---*---*---
\u682a\u5f0f\u4f1a\u793eINDETAIL
\u30cb\u30a2\u30b7\u30e7\u30a2\u7dcf\u5408\u30b5\u30fc\u30d3\u30b9\u4e8b\u696d\u672c\u90e8
\u30b2\u30fc\u30e0\u30b5\u30fc\u30d3\u30b9\u4e8b\u696d\u90e8
\u9673\u3000\u695a\u9d6c
E-mail :chin-sh@indetail.co.jp
URL : http://www.indetail.co.jp
\u3010\u672d\u5e4c\u672c\u793e\uff0fLABO\uff0fLABO2\u3011
\u3012060-0042
\u672d\u5e4c\u5e02\u4e2d\u592e\u533a\u5927\u901a\u897f9\u4e01\u76ee3\u756a\u573033
\u30ad\u30bf\u30b3\u30fc\u30bb\u30f3\u30bf\u30fc\u30d3\u30eb\u30c7\u30a3\u30f3\u30b0
\uff08\u672d\u5e4c\u672c\u793e\uff0fLABO2\uff1a2\u968e\u3001LABO\uff1a9\u968e\uff09
TEL\uff1a011-206-9235 FAX\uff1a011-206-9236
\u3010\u6771\u4eac\u652f\u5e97\u3011
\u3012108-0014
\u6771\u4eac\u90fd\u6e2f\u533a\u829d5\u4e01\u76ee29\u756a20\u53f7 \u30af\u30ed\u30b9\u30aa\u30d5\u30a3\u30b9\u4e09\u7530
TEL\uff1a03-6809-6502 FAX\uff1a03-6809-6504
\u3010\u540d\u53e4\u5c4b\u30b5\u30c6\u30e9\u30a4\u30c8\u3011
\u3012460-0002
\u611b\u77e5\u770c\u540d\u53e4\u5c4b\u5e02\u4e2d\u533a\u4e38\u306e\u51853\u4e01\u76ee17\u756a24\u53f7 NAYUTA BLD
TEL\uff1a052-971-0086