You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/23 04:36:46 UTC
[8/9] incubator-s2graph git commit: Merge remote-tracking branch
'apache/master' into S2GRAPH-201
Merge remote-tracking branch 'apache/master' into S2GRAPH-201
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/4e758c46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/4e758c46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/4e758c46
Branch: refs/heads/master
Commit: 4e758c4627187bda7c82fbfb9db8747bffe26448
Parents: 162b460 9dc39ee
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Apr 23 11:44:06 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Apr 23 12:21:50 2018 +0900
----------------------------------------------------------------------
CHANGES | 7 +
build.sbt | 36 ++
dev_support/README.md | 19 +-
project/plugins.sbt | 2 +
.../scala/org/apache/s2graph/core/S2Edge.scala | 108 +++---
.../tall/SnapshotEdgeDeserializable.scala | 5 +-
.../apache/s2graph/graphql/GraphQLServer.scala | 7 +-
.../org/apache/s2graph/graphql/HttpServer.scala | 4 +-
.../apache/s2graph/graphql/bind/AstHelper.scala | 28 ++
.../s2graph/graphql/bind/Unmarshaller.scala | 127 +++++++
.../s2graph/graphql/marshaller/package.scala | 124 -------
.../graphql/repository/GraphRepository.scala | 62 +++-
.../s2graph/graphql/resolver/Resolver.scala | 28 --
.../s2graph/graphql/types/FieldResolver.scala | 96 ++++++
.../s2graph/graphql/types/ManagementType.scala | 312 +++++++++++++++++
.../graphql/types/S2ManagementType.scala | 339 -------------------
.../apache/s2graph/graphql/types/S2Type.scala | 219 ++++++------
.../types/SangriaPlayJsonScalarType.scala | 76 -----
.../s2graph/graphql/types/SchemaDef.scala | 4 +-
.../s2graph/graphql/types/StaticType.scala | 149 ++++++++
.../apache/s2graph/graphql/types/package.scala | 122 +------
.../apache/s2graph/graphql/ScenarioTest.scala | 52 +--
.../org/apache/s2graph/graphql/SchemaTest.scala | 13 +-
.../org/apache/s2graph/graphql/TestGraph.scala | 19 +-
.../apache/s2graph/s2jobs/S2GraphHelper.scala | 12 +-
.../s2jobs/loader/GraphFileOptions.scala | 19 +-
.../s2graph/s2jobs/loader/HFileGenerator.scala | 8 +-
.../s2jobs/loader/HFileMRGenerator.scala | 5 +-
.../loader/LocalBulkLoaderTransformer.scala | 7 +-
.../loader/SparkBulkLoaderTransformer.scala | 19 +-
.../s2graph/s2jobs/serde/Transformer.scala | 1 -
.../serde/reader/RowBulkFormatReader.scala | 4 +-
.../s2jobs/serde/writer/KeyValueWriter.scala | 19 +-
.../serde/writer/RowDataFrameWriter.scala | 19 ++
.../org/apache/s2graph/s2jobs/task/Sink.scala | 4 +-
.../spark/sql/streaming/S2SinkContext.scala | 2 +
.../sql/streaming/S2StreamQueryWriter.scala | 5 +-
.../apache/s2graph/s2jobs/BaseSparkTest.scala | 7 +-
.../s2graph/s2jobs/task/TaskConfTest.scala | 19 ++
s2rest_netty/build.sbt | 2 -
s2rest_play/build.sbt | 2 -
41 files changed, 1170 insertions(+), 942 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/CHANGES
----------------------------------------------------------------------
diff --cc CHANGES
index 077f641,f61461c..f8887f6
--- a/CHANGES
+++ b/CHANGES
@@@ -35,6 -35,7 +35,8 @@@ Release Notes - S2Graph - Version 0.2.
* [S2GRAPH-163] - Update version.sbt after release
* [S2GRAPH-180] - Implement missing Management API
* [S2GRAPH-197] - Provide S2graphSink for non-streaming dataset
+ * [S2GRAPH-205] - too many initialize S2Graph when writeBatchMutate on S2GraphSink
++ * [S2GRAPH-201] - Provide S2GraphSource
** Bug
* [S2GRAPH-159] - Wrong syntax at a bash script under Linux
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index ade62fa,b65af21..b0b4aed
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@@ -29,11 -31,38 +29,17 @@@ import play.api.libs.json.{JsObject, Js
import scala.concurrent.ExecutionContext
import scala.util.Try
- object S2GraphHelper {
- def initS2Graph(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = {
- new S2Graph(config)
+ object S2GraphHelper extends Logger {
+ private var s2Graph:S2Graph = null
+
+ def getS2Graph(config: Config, init:Boolean = false)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = {
+ if (s2Graph == null || init) {
+ logger.info(s"S2Graph initialized..")
+ s2Graph = new S2Graph(config)
+ }
+ s2Graph
}
- def buildDegreePutRequests(s2: S2Graph,
- vertexId: String,
- labelName: String,
- direction: String,
- degreeVal: Long): Seq[SKeyValue] = {
- val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB."))
- val dir = GraphUtil.directions(direction)
- val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
- throw new RuntimeException(s"$vertexId can not be converted into innerval")
- }
- val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, innerVal))
-
- val ts = System.currentTimeMillis()
- val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
- val edge = s2.elementBuilder.newEdge(vertex, vertex, label, dir, propsWithTs = propsWithTs)
-
- edge.edgesWithIndex.flatMap { indexEdge =>
- s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues
- }
- }
-
private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): Seq[SKeyValue] = {
val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
index 95847f9,36b585e..bf3d25c
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
@@@ -29,22 -28,14 +28,18 @@@ import org.apache.spark.rdd.RD
import scala.reflect.ClassTag
class SparkBulkLoaderTransformer(val config: Config,
- val options: GraphFileOptions) extends Transformer[RDD] {
+ val labelMapping: Map[String, String] = Map.empty,
+ val buildDegree: Boolean = false) extends Transformer[RDD] {
+
- val GraphElementEncoder = org.apache.spark.sql.Encoders.kryo[GraphElement]
-
- implicit val encoder = GraphElementEncoder
-
+ def this(config: Config, options: GraphFileOptions) =
+ this(config, options.labelMapping, options.buildDegree)
override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = {
val degrees = elements.mapPartitions { iter =>
- val s2 = S2SinkContext(config).getGraph
+ val s2 = S2GraphHelper.getS2Graph(config)
iter.flatMap { element =>
- DegreeKey.fromGraphElement(s2, element).map(_ -> 1L)
- DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
++ DegreeKey.fromGraphElement(s2, element, labelMapping).map(_ -> 1L)
}
}.reduceByKey(_ + _)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala
index 7d2b981,0000000..be23628
mode 100644,000000..100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala
@@@ -1,17 -1,0 +1,36 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+package org.apache.s2graph.s2jobs.serde.writer
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.s2graph.s2jobs.serde.GraphElementWritable
+import org.apache.spark.sql.Row
+
+class RowDataFrameWriter extends GraphElementWritable[Row]{
+ override def write(s2: S2Graph)(element: GraphElement): Row = {
+ S2GraphHelper.graphElementToSparkSqlRow(s2, element)
+ }
+
+ override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): Row = {
+ val element = DegreeKey.toEdge(s2, degreeKey, count)
+ S2GraphHelper.graphElementToSparkSqlRow(s2, element)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/TaskConfTest.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/TaskConfTest.scala
index 95e2bac,0000000..98b05ac
mode 100644,000000..100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/TaskConfTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/TaskConfTest.scala
@@@ -1,42 -1,0 +1,61 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+package org.apache.s2graph.s2jobs.task
+
+import org.apache.s2graph.s2jobs.BaseSparkTest
+import play.api.libs.json.Json
+
+class TaskConfTest extends BaseSparkTest {
+ test("parse dump loader TaskConf") {
+ val s =
+ """
+ |{
+ | "name": "s2graph_sink",
+ | "inputs": [
+ | "filter"
+ | ],
+ | "type": "s2graph",
+ | "options": {
+ | "writeMethod": "bulk",
+ | "hbase.zookeeper.quorum": "localhost",
+ | "db.default.driver": "com.mysql.jdbc.Driver",
+ | "db.default.url": "jdbc:mysql://localhost:3306/graph_dev",
+ | "db.default.user": "graph",
+ | "db.default.password": "graph",
+ | "--input": "dummy",
+ | "--tempDir": "dummy",
+ | "--output": "/tmp/HTableMigrate",
+ | "--zkQuorum": "localhost",
+ | "--table": "CopyRated",
+ | "--dbUrl": "jdbc:mysql://localhost:3306/graph_dev",
+ | "--dbUser": "graph",
+ | "--dbPassword": "graph",
+ | "--dbDriver": "com.mysql.jdbc.Driver",
+ | "--autoEdgeCreate": "true",
+ | "--buildDegree": "true"
+ | }
+ | }
+ """.stripMargin
+
+ implicit val TaskConfReader = Json.reads[TaskConf]
+ val taskConf = Json.parse(s).as[TaskConf]
+
+ }
+}