You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/23 04:36:46 UTC

[8/9] incubator-s2graph git commit: Merge remote-tracking branch 'apache/master' into S2GRAPH-201

Merge remote-tracking branch 'apache/master' into S2GRAPH-201


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/4e758c46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/4e758c46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/4e758c46

Branch: refs/heads/master
Commit: 4e758c4627187bda7c82fbfb9db8747bffe26448
Parents: 162b460 9dc39ee
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Apr 23 11:44:06 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Apr 23 12:21:50 2018 +0900

----------------------------------------------------------------------
 CHANGES                                         |   7 +
 build.sbt                                       |  36 ++
 dev_support/README.md                           |  19 +-
 project/plugins.sbt                             |   2 +
 .../scala/org/apache/s2graph/core/S2Edge.scala  | 108 +++---
 .../tall/SnapshotEdgeDeserializable.scala       |   5 +-
 .../apache/s2graph/graphql/GraphQLServer.scala  |   7 +-
 .../org/apache/s2graph/graphql/HttpServer.scala |   4 +-
 .../apache/s2graph/graphql/bind/AstHelper.scala |  28 ++
 .../s2graph/graphql/bind/Unmarshaller.scala     | 127 +++++++
 .../s2graph/graphql/marshaller/package.scala    | 124 -------
 .../graphql/repository/GraphRepository.scala    |  62 +++-
 .../s2graph/graphql/resolver/Resolver.scala     |  28 --
 .../s2graph/graphql/types/FieldResolver.scala   |  96 ++++++
 .../s2graph/graphql/types/ManagementType.scala  | 312 +++++++++++++++++
 .../graphql/types/S2ManagementType.scala        | 339 -------------------
 .../apache/s2graph/graphql/types/S2Type.scala   | 219 ++++++------
 .../types/SangriaPlayJsonScalarType.scala       |  76 -----
 .../s2graph/graphql/types/SchemaDef.scala       |   4 +-
 .../s2graph/graphql/types/StaticType.scala      | 149 ++++++++
 .../apache/s2graph/graphql/types/package.scala  | 122 +------
 .../apache/s2graph/graphql/ScenarioTest.scala   |  52 +--
 .../org/apache/s2graph/graphql/SchemaTest.scala |  13 +-
 .../org/apache/s2graph/graphql/TestGraph.scala  |  19 +-
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   |  12 +-
 .../s2jobs/loader/GraphFileOptions.scala        |  19 +-
 .../s2graph/s2jobs/loader/HFileGenerator.scala  |   8 +-
 .../s2jobs/loader/HFileMRGenerator.scala        |   5 +-
 .../loader/LocalBulkLoaderTransformer.scala     |   7 +-
 .../loader/SparkBulkLoaderTransformer.scala     |  19 +-
 .../s2graph/s2jobs/serde/Transformer.scala      |   1 -
 .../serde/reader/RowBulkFormatReader.scala      |   4 +-
 .../s2jobs/serde/writer/KeyValueWriter.scala    |  19 +-
 .../serde/writer/RowDataFrameWriter.scala       |  19 ++
 .../org/apache/s2graph/s2jobs/task/Sink.scala   |   4 +-
 .../spark/sql/streaming/S2SinkContext.scala     |   2 +
 .../sql/streaming/S2StreamQueryWriter.scala     |   5 +-
 .../apache/s2graph/s2jobs/BaseSparkTest.scala   |   7 +-
 .../s2graph/s2jobs/task/TaskConfTest.scala      |  19 ++
 s2rest_netty/build.sbt                          |   2 -
 s2rest_play/build.sbt                           |   2 -
 41 files changed, 1170 insertions(+), 942 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/CHANGES
----------------------------------------------------------------------
diff --cc CHANGES
index 077f641,f61461c..f8887f6
--- a/CHANGES
+++ b/CHANGES
@@@ -35,6 -35,7 +35,8 @@@ Release Notes - S2Graph - Version 0.2.
      * [S2GRAPH-163] - Update version.sbt after release
      * [S2GRAPH-180] - Implement missing Management API
      * [S2GRAPH-197] - Provide S2graphSink for non-streaming dataset
+     * [S2GRAPH-205] - too many initialize S2Graph when writeBatchMutate on S2GraphSink
++    * [S2GRAPH-201] - Provide S2GraphSource
  
  ** Bug
      * [S2GRAPH-159] - Wrong syntax at a bash script under Linux

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index ade62fa,b65af21..b0b4aed
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@@ -29,11 -31,38 +29,17 @@@ import play.api.libs.json.{JsObject, Js
  import scala.concurrent.ExecutionContext
  import scala.util.Try
  
- object S2GraphHelper {
-   def initS2Graph(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = {
-     new S2Graph(config)
+ object S2GraphHelper extends Logger {
+   private var s2Graph:S2Graph = null
+ 
+   def getS2Graph(config: Config, init:Boolean = false)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = {
+     if (s2Graph == null || init) {
+       logger.info(s"S2Graph initialized..")
+       s2Graph = new S2Graph(config)
+     }
+     s2Graph
    }
  
 -  def buildDegreePutRequests(s2: S2Graph,
 -                             vertexId: String,
 -                             labelName: String,
 -                             direction: String,
 -                             degreeVal: Long): Seq[SKeyValue] = {
 -    val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB."))
 -    val dir = GraphUtil.directions(direction)
 -    val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
 -      throw new RuntimeException(s"$vertexId can not be converted into innerval")
 -    }
 -    val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, innerVal))
 -
 -    val ts = System.currentTimeMillis()
 -    val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
 -    val edge = s2.elementBuilder.newEdge(vertex, vertex, label, dir, propsWithTs = propsWithTs)
 -
 -    edge.edgesWithIndex.flatMap { indexEdge =>
 -      s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues
 -    }
 -  }
 -
    private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): Seq[SKeyValue] = {
      val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
  

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
index 95847f9,36b585e..bf3d25c
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
@@@ -29,22 -28,14 +28,18 @@@ import org.apache.spark.rdd.RD
  import scala.reflect.ClassTag
  
  class SparkBulkLoaderTransformer(val config: Config,
 -                                 val options: GraphFileOptions) extends Transformer[RDD] {
 +                                 val labelMapping: Map[String, String] = Map.empty,
 +                                 val buildDegree: Boolean = false) extends Transformer[RDD] {
 +
-   val GraphElementEncoder = org.apache.spark.sql.Encoders.kryo[GraphElement]
- 
-   implicit val encoder = GraphElementEncoder
- 
 +  def this(config: Config, options: GraphFileOptions) =
 +    this(config, options.labelMapping, options.buildDegree)
  
    override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = {
      val degrees = elements.mapPartitions { iter =>
-       val s2 = S2SinkContext(config).getGraph
+       val s2 = S2GraphHelper.getS2Graph(config)
  
        iter.flatMap { element =>
-         DegreeKey.fromGraphElement(s2, element).map(_ -> 1L)
 -        DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
++        DegreeKey.fromGraphElement(s2, element, labelMapping).map(_ -> 1L)
        }
      }.reduceByKey(_ + _)
  

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala
index 7d2b981,0000000..be23628
mode 100644,000000..100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala
@@@ -1,17 -1,0 +1,36 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ * 
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
 +package org.apache.s2graph.s2jobs.serde.writer
 +
 +import org.apache.s2graph.core.{GraphElement, S2Graph}
 +import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
 +import org.apache.s2graph.s2jobs.serde.GraphElementWritable
 +import org.apache.spark.sql.Row
 +
 +class RowDataFrameWriter extends GraphElementWritable[Row]{
 +  override def write(s2: S2Graph)(element: GraphElement): Row = {
 +    S2GraphHelper.graphElementToSparkSqlRow(s2, element)
 +  }
 +
 +  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): Row = {
 +    val element = DegreeKey.toEdge(s2, degreeKey, count)
 +    S2GraphHelper.graphElementToSparkSqlRow(s2, element)
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/TaskConfTest.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/TaskConfTest.scala
index 95e2bac,0000000..98b05ac
mode 100644,000000..100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/TaskConfTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/TaskConfTest.scala
@@@ -1,42 -1,0 +1,61 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ * 
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
 +package org.apache.s2graph.s2jobs.task
 +
 +import org.apache.s2graph.s2jobs.BaseSparkTest
 +import play.api.libs.json.Json
 +
 +class TaskConfTest extends BaseSparkTest {
 +  test("parse dump loader TaskConf") {
 +    val s =
 +      """
 +        |{
 +        |        "name": "s2graph_sink",
 +        |        "inputs": [
 +        |          "filter"
 +        |        ],
 +        |        "type": "s2graph",
 +        |        "options": {
 +        |          "writeMethod": "bulk",
 +        |          "hbase.zookeeper.quorum": "localhost",
 +        |          "db.default.driver": "com.mysql.jdbc.Driver",
 +        |          "db.default.url": "jdbc:mysql://localhost:3306/graph_dev",
 +        |          "db.default.user": "graph",
 +        |          "db.default.password": "graph",
 +        |          "--input": "dummy",
 +        |          "--tempDir": "dummy",
 +        |          "--output": "/tmp/HTableMigrate",
 +        |          "--zkQuorum": "localhost",
 +        |          "--table": "CopyRated",
 +        |          "--dbUrl": "jdbc:mysql://localhost:3306/graph_dev",
 +        |          "--dbUser": "graph",
 +        |          "--dbPassword": "graph",
 +        |          "--dbDriver": "com.mysql.jdbc.Driver",
 +        |          "--autoEdgeCreate": "true",
 +        |          "--buildDegree": "true"
 +        |        }
 +        |      }
 +      """.stripMargin
 +
 +    implicit val TaskConfReader = Json.reads[TaskConf]
 +    val taskConf = Json.parse(s).as[TaskConf]
 +
 +  }
 +}