You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@s2graph.apache.org by "DOYUNG YOON (JIRA)" <ji...@apache.org> on 2018/12/27 10:15:00 UTC

[jira] [Updated] (S2GRAPH-252) Improve performance of S2GraphSource

     [ https://issues.apache.org/jira/browse/S2GRAPH-252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

DOYUNG YOON updated S2GRAPH-252:
--------------------------------
    Description: 
S2GraphSource is responsible to translate HBASE snapshot(*TableSnapshotInputFormat*) to graph element such as edge/vertex.

below code create *RDD[(ImmutableBytesWritable, Result)]* from *TableSnapshotInputFormat*

{noformat}
val rdd = ss.sparkContext.newAPIHadoopRDD(job.getConfiguration,
        classOf[TableSnapshotInputFormat],
        classOf[ImmutableBytesWritable],
        classOf[Result])
{noformat}

The problem comes after obtaining RDD. 

Current implementation use *RDD.mapPartitions* because S2Graph class is not serializable, mostly because it has Asynchbase client in it.

The problematic part is the following.

{noformat}
val elements = input.mapPartitions { iter =>
      val s2 = S2GraphHelper.getS2Graph(config)

      iter.flatMap { line =>
        reader.read(s2)(line)
      }
    }

    val kvs = elements.mapPartitions { iter =>
      val s2 = S2GraphHelper.getS2Graph(config)

      iter.map(writer.write(s2)(_))
    }
{noformat}

On each RDD partition, S2Graph instance connect meta storage, such as mysql, and use the local cache to avoid heavy read from meta storage.

Even though it works with a dataset with the small partition, the scalability of S2GraphSource limited by the number of partitions, which need to be increased when dealing with large data.

Possible improvement can be achieved by not depending on meta storage when it deserializes HBase's Result class into Edge/Vertex. 

We can simply achieve this by loading all necessary schemas from meta storage on spark driver, then broadcast these schemas and use them to deserialize instead of connecting meta storage on each partition.





  was:
S2GraphSource is responsible to translate HBASE snapshot(`TableSnapshotInputFormat`) to graph element such as edge/vertex.

below code create `RDD[(ImmutableBytesWritable, Result)]` from TableSnapshotInputFormat

{noformat}
val rdd = ss.sparkContext.newAPIHadoopRDD(job.getConfiguration,
        classOf[TableSnapshotInputFormat],
        classOf[ImmutableBytesWritable],
        classOf[Result])
{noformat}

The problem comes after obtaining RDD. 

Current implementation use `RDD.mapPartitions` because S2Graph class is not serializable, mostly because it has Asynchbase client in it.

The problematic part is the following.

{noformat}
val elements = input.mapPartitions { iter =>
      val s2 = S2GraphHelper.getS2Graph(config)

      iter.flatMap { line =>
        reader.read(s2)(line)
      }
    }

    val kvs = elements.mapPartitions { iter =>
      val s2 = S2GraphHelper.getS2Graph(config)

      iter.map(writer.write(s2)(_))
    }
{noformat}

On each RDD partition, S2Graph instance connect meta storage, such as mysql, and use the local cache to avoid heavy read from meta storage.

Even though it works with a dataset with the small partition, the scalability of S2GraphSource limited by the number of partitions, which need to be increased when dealing with large data.

Possible improvement can be achieved by not depending on meta storage when it deserializes HBase's Result class into Edge/Vertex. 

We can simply achieve this by loading all necessary schemas from meta storage on spark driver, then broadcast these schemas and use them to deserialize instead of connecting meta storage on each partition.






> Improve performance of S2GraphSource 
> -------------------------------------
>
>                 Key: S2GRAPH-252
>                 URL: https://issues.apache.org/jira/browse/S2GRAPH-252
>             Project: S2Graph
>          Issue Type: Improvement
>          Components: s2jobs
>            Reporter: DOYUNG YOON
>            Assignee: DOYUNG YOON
>            Priority: Major
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> S2GraphSource is responsible to translate HBASE snapshot(*TableSnapshotInputFormat*) to graph element such as edge/vertex.
> below code create *RDD[(ImmutableBytesWritable, Result)]* from *TableSnapshotInputFormat*
> {noformat}
> val rdd = ss.sparkContext.newAPIHadoopRDD(job.getConfiguration,
>         classOf[TableSnapshotInputFormat],
>         classOf[ImmutableBytesWritable],
>         classOf[Result])
> {noformat}
> The problem comes after obtaining RDD. 
> Current implementation use *RDD.mapPartitions* because S2Graph class is not serializable, mostly because it has Asynchbase client in it.
> The problematic part is the following.
> {noformat}
> val elements = input.mapPartitions { iter =>
>       val s2 = S2GraphHelper.getS2Graph(config)
>       iter.flatMap { line =>
>         reader.read(s2)(line)
>       }
>     }
>     val kvs = elements.mapPartitions { iter =>
>       val s2 = S2GraphHelper.getS2Graph(config)
>       iter.map(writer.write(s2)(_))
>     }
> {noformat}
> On each RDD partition, S2Graph instance connect meta storage, such as mysql, and use the local cache to avoid heavy read from meta storage.
> Even though it works with a dataset with the small partition, the scalability of S2GraphSource limited by the number of partitions, which need to be increased when dealing with large data.
> Possible improvement can be achieved by not depending on meta storage when it deserializes HBase's Result class into Edge/Vertex. 
> We can simply achieve this by loading all necessary schemas from meta storage on spark driver, then broadcast these schemas and use them to deserialize instead of connecting meta storage on each partition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)