You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@eagle.apache.org by "Hao Chen (JIRA)" <ji...@apache.org> on 2015/12/01 09:35:10 UTC

[jira] [Updated] (EAGLE-66) Eagle TypeSafe Stream Processing DSL

     [ https://issues.apache.org/jira/browse/EAGLE-66?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hao Chen updated EAGLE-66:
--------------------------
    Description: 
Here is end-to-end Type-Safe API for streaming processing: https://github.corp.ebay.com/gist/hchen9/fbcf1af39134ba09b27d

Firstly, as to application developer space, developer should not need to care about exchanging internal data structure between processing elements, so that it should allow user to process the stream just like processing a collection of typed object, for example in following case, we should allow to:
In type-safe way: groupBy(_.user)
In field-base way: groupBy("user”)
https://github.corp.ebay.com/gist/hchen9/fbcf1af39134ba09b27d#file-typesafed-api-eagle-scala-L95-L145

@StreamDef("hdfsAuditLogStream")
case class AuditLog(timestamp:Long,user:String,path:String,cmd:String)

object TypedSafeApp extends App{
  val ec = ExecutionContext.get[StormExecutionContext]
  ec.fromKafka(ec.getConfig){ bytes => AuditLog(1000,"someone","/tmp/private","open")}.parallism(123)// Stream[AuditLog]
    .groupBy(_.user).parallism(12) 
//  Stream[AuditLog]
    .map { obj => (obj.user,obj)}//  Stream[(String, AuditLog)]
    .groupBy(_._1)// Stream[(String, AuditLog)]
    .map(_._2)// Stream[AuditLog]
    .alert.persistAndEmail// Stream[AlertAPIEntity]
}

As to framework space, especially for groupBy semantics, it should look like nothing different as to developer, but in internal space, the exchange structure is (AnyRef, AuditLog) 
As to groupBy(_.user) and object:  obj in type of AuditLog, it’s always (obj.user,obj) + fieldsGrouping  by 0 for storm
As to groupBy(“user") and object:  obj in type of AuditLog, it’s (readValueByReflect(“user”,obj),obj) + fieldsGrouping by 0 for storm

> Eagle TypeSafe Stream Processing DSL
> ------------------------------------
>
>                 Key: EAGLE-66
>                 URL: https://issues.apache.org/jira/browse/EAGLE-66
>             Project: Eagle
>          Issue Type: Improvement
>    Affects Versions: 0.3.0
>            Reporter: Hao Chen
>            Assignee: Hao Chen
>             Fix For: 0.3.0
>
>
> Here is end-to-end Type-Safe API for streaming processing: https://github.corp.ebay.com/gist/hchen9/fbcf1af39134ba09b27d
> Firstly, as to application developer space, developer should not need to care about exchanging internal data structure between processing elements, so that it should allow user to process the stream just like processing a collection of typed object, for example in following case, we should allow to:
> In type-safe way: groupBy(_.user)
> In field-base way: groupBy("user”)
> https://github.corp.ebay.com/gist/hchen9/fbcf1af39134ba09b27d#file-typesafed-api-eagle-scala-L95-L145
> @StreamDef("hdfsAuditLogStream")
> case class AuditLog(timestamp:Long,user:String,path:String,cmd:String)
> object TypedSafeApp extends App{
>   val ec = ExecutionContext.get[StormExecutionContext]
>   ec.fromKafka(ec.getConfig){ bytes => AuditLog(1000,"someone","/tmp/private","open")}.parallism(123)// Stream[AuditLog]
>     .groupBy(_.user).parallism(12) 
> //  Stream[AuditLog]
>     .map { obj => (obj.user,obj)}//  Stream[(String, AuditLog)]
>     .groupBy(_._1)// Stream[(String, AuditLog)]
>     .map(_._2)// Stream[AuditLog]
>     .alert.persistAndEmail// Stream[AlertAPIEntity]
> }
> As to framework space, especially for groupBy semantics, it should look like nothing different as to developer, but in internal space, the exchange structure is (AnyRef, AuditLog) 
> As to groupBy(_.user) and object:  obj in type of AuditLog, it’s always (obj.user,obj) + fieldsGrouping  by 0 for storm
> As to groupBy(“user") and object:  obj in type of AuditLog, it’s (readValueByReflect(“user”,obj),obj) + fieldsGrouping by 0 for storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)