You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@s2graph.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/03/27 01:59:00 UTC

[jira] [Commented] (S2GRAPH-185) Support Spark Structured Streaming to work with data in streaming and batch

    [ https://issues.apache.org/jira/browse/S2GRAPH-185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414892#comment-16414892 ] 

ASF GitHub Bot commented on S2GRAPH-185:
----------------------------------------

GitHub user elric-k opened a pull request:

    https://github.com/apache/incubator-s2graph/pull/144

    [S2GRAPH-185] Support Spark Structured Streaming

    
    
    I added a simple job launcher using spark structured streaming, which consists of three parts: source, process, and sink.
    
    The source and sink can use the built-in types supported by structured streaming and can be added using the type provided by the 3rd party or by directly implementing.
    
    For data processing, provide SQLProcess using SparkSql and udf, and can implement custom class if necessary.
     
     
    ## Current Supported Task
    ### Source
    * kakfa : built-in
    * file : built-in
    * hive : built-in
    
    ### Process
    * sql : process spark sql
    * custom : implement if necessary
    
    ### Sink
    * kafka : built-in
    * file : built-in
    * es : elasticsearch-spark
    * s2graph : added
      
      * Use the mutateElement function of the S2graph object.
      * S2graph related setting is required.
      * put the config file in the classpath or specify it in the job description options.
      ```
      ex)
          "type": "s2graph",
          "options": {
            "hbase.zookeeper.quorum": "",
            "db.default.driver": "",
            "db.default.url": ""
          }
      ```
    
    
    
    ## Job Description
    
    Tasks and workflow can be described in job description.
    And dependencies between tasks are defined by the name of the task specified in the inputs field
    
    I referenced the [airstream of Airbnb](https://www.slideshare.net/databricks/building-data-product-based-on-apache-spark-at-airbnb-with-jingwei-lu-and-liyin-tang) for the specification of the job description.
    
    ### Json Spec
    ```
    {
        "name": "JOB_NAME",
        "source": [
            {
                "name": "TASK_NAME",
                "inputs": [],
                "type": "SOURCE_TYPE",
                "options": {
                    "KEY" : "VALUE"
                }
            }
        ],
        "process": [
            {
                "name": "TASK_NAME",
                "inputs": ["INPUT_TASK_NAME"],
                "type": "PROCESS_TYPE",
                "options": {
                    "KEY" : "VALUE"
                }
            }
        ],
        "sink": [
            {
                "name": "TASK_NAME",
                "inputs": ["INPUT_TASK_NAME"],
                "type": "SINK_TYPE",
                "options": {
                    "KEY" : "VALUE"
                }
            }
        ]
    }
    ```
    
    ## Data Schema for Kafka 
    When using Kafka as data source consumer needs to parse it and later on interpret it, because of Kafka has no schema.
    
    When reading data from Kafka with structure streaming, the Dataframe has the following schema.
    ```
    Column    Type
    key        binary
    value    binary
    topic    string
    partition    int
    offset    long
    timestamp    long
    timestampType    int
    ```
    
    In the case of JSON format, data schema can be supported in config.
    You can create a schema by giving a string representing the struct type as JSON as shown below.
    ```
    {
      "type": "struct",
      "fields": [
        {
          "name": "timestamp",
          "type": "long",
          "nullable": false,
          "metadata": {}
        },
        {
          "name": "operation",
          "type": "string",
          "nullable": true,
          "metadata": {}
        },
        {
          "name": "elem",
          "type": "string",
          "nullable": true,
          "metadata": {}
        },
        {
          "name": "from",
          "type": "string",
          "nullable": true,
          "metadata": {}
        },
        {
          "name": "to",
          "type": "string",
          "nullable": true,
          "metadata": {}
        },
        {
          "name": "label",
          "type": "string",
          "nullable": true,
          "metadata": {}
        },
        {
          "name": "service",
          "type": "string",
          "nullable": true,
          "metadata": {}
        },
        {
          "name": "props",
          "type": "string",
          "nullable": true,
          "metadata": {}
        }
      ]
    }
    ```
    
    
    ## Sample job
    ### wallog trasnform (kafka to kafka)
    ```
    {
        "name": "kafkaJob",
        "source": [
            {
                "name": "wal",
                "inputs": [],
                "type": "kafka",
                "options": {
                    "kafka.bootstrap.servers" : "localhost:9092",
                    "subscribe": "s2graphInJson",
                    "maxOffsetsPerTrigger": "10000",
                    "format": "json",
                    "schema": "{\"type\":\"struct\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}},{\"name\":\"operation\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"elem\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"from\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"to\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"label\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"service\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"props\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"
                }
            }
        ],
        "process": [
            {
                "name": "transform",
                "inputs": ["wal"],
                "type": "sql",
                "options": {
                    "sql": "SELECT timestamp, `from` as userId, to as itemId, label as action FROM wal WHERE label = 'user_action'"
                }
            }
        ],
        "sink": [
            {
                "name": "kafka_sink",
                "inputs": ["transform"],
                "type": "kafka",
                "options": {
                    "kafka.bootstrap.servers" : "localhost:9092",
                    "topic": "s2graphTransform",
                    "format": "json"
                }
            }
        ]
    }
    ```
    
    ### wallog transform (hdfs to hdfs)
    
    ```
    {
        "name": "hdfsJob",
        "source": [
            {
                "name": "wal",
                "inputs": [],
                "type": "file",
                "options": {
                    "paths": "/wal",
                    "format": "parquet"
                }
            }
        ],
        "process": [
            {
                "name": "transform",
                "inputs": ["wal"],
                "type": "sql",
                "options": {
                    "sql": "SELECT timestamp, `from` as userId, to as itemId, label as action FROM wal WHERE label = 'user_action'"
                }
            }
        ],
        "sink": [
            {
                "name": "hdfs_sink",
                "inputs": ["transform"],
                "type": "file",
                "options": {
                    "path": "/wal_transform",
                    "format": "json"
                }
            }
        ]
    }
    ```
    
    ### Launch Job
    
    When submitting spark job with assembly jar, use these parameters with the job description file path.
    (currently only support file type)
    
    ```
    // main class : org.apache.s2graph.s2jobs.JobLauncher
    Usage: run [file|db] [options]
      -n, --name <value>      job display name
    Command: file [options]
    get config from file
      -f, --confFile <file>   configuration file
    Command: db [options]
    get config from db
      -i, --jobId <jobId>     configuration file
      ```


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/elric-k/incubator-s2graph S2GRAPH-185

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-s2graph/pull/144.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #144
    
----
commit 0b12af9f36465f18a0a7116dd38cb54804d59cb8
Author: elric.kang <el...@...>
Date:   2018-03-22T07:58:18Z

    add simple etl job interface

commit 726b887239b30c8013af90114fee602bb0c6eb17
Author: elric.kang <el...@...>
Date:   2018-03-22T09:53:18Z

    add s2graph sink for spark structured streaming

commit 2dd7d03e481063fb320c1fb1100ff94fb878d828
Author: elric.kang <el...@...>
Date:   2018-03-26T10:38:04Z

    shade protobuf to resolve confilct with spark lib

----


> Support Spark Structured Streaming to work with data in streaming and batch
> ---------------------------------------------------------------------------
>
>                 Key: S2GRAPH-185
>                 URL: https://issues.apache.org/jira/browse/S2GRAPH-185
>             Project: S2Graph
>          Issue Type: New Feature
>            Reporter: Chul Kang
>            Priority: Major
>
> By default, S2Graph will publish all edge/vertex requests to the Kafka in WAL format.
>  In Kakao, S2Graph has been used as a master database to store all user's activities,
>  I have been developing several ETL jobs that are suitable for these use-cases, and I want to contribute them.
> Use cases are as follows,
> {code:java}
> edge/vertex incoming through the Kafka save to other storages
> - druid sink for slice and dice
> - es sink for search
> - file sink for store edge/vertex
> ingest from various storage to s2graph
> - MySQL binlog
> - hdfs/hive/hbase
> ETL job on edge/vertex data
> - merge all user activities based on userId.
> - generate statistical information
> - apply ML library on graph data format
> {code}
>  
> Below are some simple requirements for this,
>  * supports both streaming/static source data processing
>  * computation flow is re-usable and sharing on streaming and batch
>  * operate by simple job description
>  
> Spark Structured Streaming supports unified API for both streaming and batch by using Dataframe/Dataset API from SparkSQL.
>  It allows the same operations to be executed on bounded/unbounded data sources and guarantees exactly-once fault-tolerance.
>  Structured streaming provides several DataSource and Sink, and it supports the implementation of the Source/Sink interface.
> Using this, we can easily develop ETL Job that can be linked to various repositories.
>  
> Reference: [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)