You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@eagle.apache.org by "Hao Chen (JIRA)" <ji...@apache.org> on 2015/12/01 09:36:11 UTC

[jira] [Commented] (EAGLE-66) Eagle TypeSafe Stream Processing DSL

    [ https://issues.apache.org/jira/browse/EAGLE-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15033340#comment-15033340 ] 

Hao Chen commented on EAGLE-66:
-------------------------------

On behalf of Edward [~yonzhang2012]

I think that is a good discussion. Welcome for more input to refine our designs.

I have some comments.
1. It’s not necessarily to have UI to define topology, if the design is good enough we can support the following ways
    1.1 Programmatically (java/scala)   <———> similar to programming with M/R framework directly
    1.2 UI <———> visualize every step in topology (more than HUE)
>> In fact, here the toJSON method just means describing each node as desirable  info, so that the whole DAG can be desirable

    1.3 Script <——> similar to PIG script which provides abstraction over analytics on top of M/R framework
>> Scala support to write scala code in script
sample-scala-code.sh


#!/bin/sh

exec
 scala "$0"
"$@"

!#

object
HelloWorld
extends
App
{

println("Hello,
 world!")

}

HelloWorld.main(args)

 The ULTIMATE goal is user can use our system without any coding or with coding on top of Eagle UDF interfaces
>> Additionally, SBT also support to override the context of REPL, so that we could provide Spark-like REPL customized for Eagle
$ ./eagle-repl.sh
> datasource “apollo-phx”
> topology:all
[0]apollo-phx-hdfsAuditLog-topology-1-1448329773 (http://phxapdes0002.stratus.phx.ebay.com:8080/topology.html?id=apollo-phx-hdfsAuditLog-topology-1-1448329773)
[1]apollo-phx-hdfsAuditLog-topology-1-1448329773 (http://phxapdes0002.stratus.phx.ebay.com:8080/topology.html?id=apollo-phx-hdfsAuditLog-topology-1-1448329773)
…
> policy:get “HDFS”
[0] some-polict
..
> ec.fromKafka().map(_.user).alert.persistAndEmail
Kafka[topic:some] 
-> map[_.user]
-> alert[persistable=true,email=true]
> ec.execute
<compiling submitting topology inline>
…

2. If we allow user to code complex transformation, we should restrict that transformation to implement UDF interface only. For example EAGLE should not allow user to spawn a thread etc. 
     EAGLE provides all complexity in framework level like externalJoin, alert, state management, data skew reduction, policy skew reduction etc.
     Those complexities are monitoring specific. We should find all common needs for monitoring specific applications.
>> Agree, at DAG layer should not have any execution layer content like thread, storm or any eagle specific element

3. GROUPBY expansion
    Yes, we should be flexible enough.
    x: T -> U
    y: U -> W
    flatmap(x).groupby(“user”).flatmap(y)
    As long as it exists a function f: U -> (String,_) , which get field “user” of String type from an object of type U.
     then another function must exists g: (String, U) -> U
    Then framework would do the following conversions:
    upstream:        flatmap(f(x))
    downstream:   flatmap(y(g))
     Here, framework should figure out f and g by reflection etc.

But I am not sure if groupby(_.user) would work. How to get which field to be extracted in runtime?

>> Firstly, groupBy(_user) works according to following definition. The parameter is just a key generator function, another more typical case should like groupBy((_.user,_.path)), which in fact finally generate a two-fields tuple ((_.user,_.path),_) where the key is (obj.user,obj.path), the value is obj, the framework should just require developer to handle the value part only just like the following map function dose   .map { obj => (obj.user,obj)}, as to the key part is just used by the framework to shuffle by the field #0.

  def groupBy[O](keyer: T => O) = GroupbyStreamProducer[T](name,FunctionGrouping(keyer))

>> Secondly, in fact currently the implementation is not very good as field-based processing but type-safe only, so the reflection here is not recommended. The real field-based processing pattern in my mind should like: https://github.com/twitter/scalding/wiki/Fields-based-API-Reference

  pipe.mapTo(()->('age, 'weight) { ... }
    .partition('age -> 'isAdult) { _ > 18 } { _.average('weight) }

4. Graph expansion 
    Today we use graph expansion to rewrite graph because we expose simple interface to developer but let framework handle complexity.
    The above GROUPBY expansion is one of the examples.
    As a monitoring framework, EAGLE should generalize more and more common needs for monitoring applications and make those needs to be some high level DSL and behind the scene, EAGLE does graph expansion.

>> Agree, never put execution detail/optimizer as DAG definition layer, let the framework handle all in execution environment.

5. Properties of vertex and edge in topology DAG
    Property 1: vertex1’s output must be same to vertex2’s input if there exists an edge between vertex1 and vertex2
          This is bad because user has to take care both vertex1 and vertex2 if there are some changes on event schema

>> If we could enable type-safe programing style and internally using key-value, user don’t need to care about event schema anymore in the whole pipeline because it ways only have two fields “key” and “value”,  except for policy evaluation part, but if this part could use reflection to get the event fields then no conflict as well. The schema problem will can handled in compiling phase.

    Property 2: in compile time, vertex includes flatmap, map, filter etc. edge includes event schema, partitioner etc. 

    Property 3: in runtime, vertex includes # of partitions, and # of ways et. Edge has includes # of events


6. Stream dynamic join (topology merge)https://issues.apache.org/jira/browse/EAGLE-26
    One practical problem is one user has multiple metrics each with different kafka topics, if we want to alert based on the 2 metrics, how do we do that without any coding?

>> Simple solution: use single topic shared by different stream  {“name”:”metric-name”,”value”:0.1, other fields …}

    Of course we start a new topology and incorporate the new policy,  but isn’t that too heavy and not flexible?
    One topology per metric is a heavy solution.

>> Solution 1: Start new topology 
>> Solution 2: Share the same topology and isolate stream flow by internal customized grouping key, for example maybe we could put the virtual topology id/policy id in the first place of the partition key and send stream to the subscribed by downstream policy, not sure how complex to make it.

7. Analytics DSL
    We don’t have the following features
    aggregation on stream - tumbling window, slide window, count, cardinality estimation
    I suggest we use Siddhi for that

>> Agree, I’m always thinking about using Siddhi as general-purpose processing engine in eagle like:

ec.fromKafka[AuditLog]
    .groupBy(_.user)
    .query( """
        |from hdfsAuditLogEventStream[(src == '/tmp/private')]#window.externalTime(timestamp,10 min)
        |select user, count(timestamp) as aggValue
        |group by user
        |having aggValue >= 5
        |insert into anotherAlertStream;
        |"""".stripMargin) 
// hdfsAuditLogEventStream -> anotherAlertStream


From: "Zhang, Edward (GDI Hadoop)" <yo...@ebay.com>
Date: Thursday, November 26, 2015 at 7:34 AM
To: "Chen, Hao" <Ha...@ebay.com>
Cc: "Jiang, Jilin" <ji...@ebay.com>, "Su, Liangfei" <li...@ebay.com>, "Sun, Libin" <li...@ebay.com>, Qingwen Zhao <qi...@ebay.com>
Subject: Re: fix issue in production and work on business features

I think that is a good discussion. Welcome for more input to refine our designs.

I have some comments.
1. It’s not necessarily to have UI to define topology, if the design is good enough we can support the following ways
    1.1 Programmatically (java/scala)   <———> similar to programming with M/R framework directly
    1.2 UI <———> visualize every step in topology (more than HUE)
    1.3 Script <——> similar to PIG script which provides abstraction over analytics on top of M/R framework
 The ULTIMATE goal is user can use our system without any coding or with coding on top of Eagle UDF interfaces

2. If we allow user to code complex transformation, we should restrict that transformation to implement UDF interface only. For example EAGLE should not allow user to spawn a thread etc. 
     EAGLE provides all complexity in framework level like externalJoin, alert, state management, data skew reduction, policy skew reduction etc.
     Those complexities are monitoring specific. We should find all common needs for monitoring specific applications.

3. GROUPBY expansion
    Yes, we should be flexible enough.
    x: T -> U
    y: U -> W
    flatmap(x).groupby(“user”).flatmap(y)
    As long as it exists a function f: U -> (String,_) , which get field “user” of String type from an object of type U.
     then another function must exists g: (String, U) -> U
    Then framework would do the following conversions:
    upstream:        flatmap(f(x))
    downstream:   flatmap(y(g))
     Here, framework should figure out f and g by reflection etc.

But I am not sure if groupby(_.user) would work. How to get which field to be extracted in runtime?

4. Graph expansion 
    Today we use graph expansion to rewrite graph because we expose simple interface to developer but let framework handle complexity.
    The above GROUPBY expansion is one of the examples.
    As a monitoring framework, EAGLE should generalize more and more common needs for monitoring applications and make those needs to be some high level DSL and behind the scene, EAGLE does graph expansion.

5. Properties of vertex and edge in topology DAG
    Property 1: vertex1’s output must be same to vertex2’s input if there exists an edge between vertex1 and vertex2
          This is bad because user has to take care both vertex1 and vertex2 if there are some changes on event schema
          
    Property 2: in compile time, vertex includes flatmap, map, filter etc. edge includes event schema, partitioner etc. 

    Property 3: in runtime, vertex includes # of partitions, and # of ways et. Edge has includes # of events

6. Stream dynamic join (topology merge)https://issues.apache.org/jira/browse/EAGLE-26
    One practical problem is one user has multiple metrics each with different kafka topics, if we want to alert based on the 2 metrics, how do we do that without any coding?
    Of course we start a new topology and incorporate the new policy,  but isn’t that too heavy and not flexible?
    One topology per metric is a heavy solution.


7. Analytics DSL
    We don’t have the following features
    aggregation on stream - tumbling window, slide window, count, cardinality estimation
    I suggest we use Siddhi for that



> Eagle TypeSafe Stream Processing DSL
> ------------------------------------
>
>                 Key: EAGLE-66
>                 URL: https://issues.apache.org/jira/browse/EAGLE-66
>             Project: Eagle
>          Issue Type: Improvement
>    Affects Versions: 0.3.0
>            Reporter: Hao Chen
>            Assignee: Hao Chen
>             Fix For: 0.3.0
>
>
> Here is end-to-end Type-Safe API for streaming processing: https://github.corp.ebay.com/gist/hchen9/fbcf1af39134ba09b27d
> Firstly, as to application developer space, developer should not need to care about exchanging internal data structure between processing elements, so that it should allow user to process the stream just like processing a collection of typed object, for example in following case, we should allow to:
> In type-safe way: groupBy(_.user)
> In field-base way: groupBy("user”)
> https://github.corp.ebay.com/gist/hchen9/fbcf1af39134ba09b27d#file-typesafed-api-eagle-scala-L95-L145
> @StreamDef("hdfsAuditLogStream")
> case class AuditLog(timestamp:Long,user:String,path:String,cmd:String)
> object TypedSafeApp extends App{
>   val ec = ExecutionContext.get[StormExecutionContext]
>   ec.fromKafka(ec.getConfig){ bytes => AuditLog(1000,"someone","/tmp/private","open")}.parallism(123)// Stream[AuditLog]
>     .groupBy(_.user).parallism(12) 
> //  Stream[AuditLog]
>     .map { obj => (obj.user,obj)}//  Stream[(String, AuditLog)]
>     .groupBy(_._1)// Stream[(String, AuditLog)]
>     .map(_._2)// Stream[AuditLog]
>     .alert.persistAndEmail// Stream[AlertAPIEntity]
> }
> As to framework space, especially for groupBy semantics, it should look like nothing different as to developer, but in internal space, the exchange structure is (AnyRef, AuditLog) 
> As to groupBy(_.user) and object:  obj in type of AuditLog, it’s always (obj.user,obj) + fieldsGrouping  by 0 for storm
> As to groupBy(“user") and object:  obj in type of AuditLog, it’s (readValueByReflect(“user”,obj),obj) + fieldsGrouping by 0 for storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)