You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/04 02:45:02 UTC

[06/11] incubator-s2graph git commit: modify tc

modify tc


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/c47359c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/c47359c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/c47359c1

Branch: refs/heads/master
Commit: c47359c12796521106884b09d464099a64924f5d
Parents: 7ad721c
Author: Chul Kang <el...@apache.org>
Authored: Tue Apr 3 14:54:27 2018 +0900
Committer: Chul Kang <el...@apache.org>
Committed: Tue Apr 3 14:54:27 2018 +0900

----------------------------------------------------------------------
 .../s2graph/s2jobs/S2GraphHelperTest.scala      | 13 +++++++++-
 .../apache/s2graph/s2jobs/task/SinkTest.scala   | 27 +++++++++++++++++---
 2 files changed, 36 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c47359c1/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
index 6e89466..f2b0102 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
@@ -19,6 +19,17 @@
 
 package org.apache.s2graph.s2jobs
 
-class S2GraphHelperTest {
+import org.apache.s2graph.s2jobs.task.TaskConf
 
+class S2GraphHelperTest extends BaseSparkTest {
+  test("toGraphFileOptions") {
+    val args = options.toCommand.grouped(2).map { kv =>
+          kv.head -> kv.last
+      }.toMap ++ Map("db.default.url" -> "jdbc://localhost:3306/mysql")
+
+    println(args)
+    val taskConf = TaskConf("dummy", "sink", Nil, args)
+    val graphFileOptions = S2GraphHelper.toGraphFileOptions(taskConf)
+    println(graphFileOptions)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c47359c1/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
index a21b3df..db96328 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
@@ -42,12 +42,13 @@ class SinkTest extends BaseSparkTest {
     }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction")
   }
 
-  test("S2graphSink writeBatch.") {
-    val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
+  test("S2graphSink writeBatchWithBulkload") {
+    initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
 
     val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
     val df = toDataFrame(Seq(bulkEdgeString))
-    val args = options.toCommand.grouped(2).map { kv =>
+    val args = Map("writeMethod" -> "bulk") ++
+      options.toCommand.grouped(2).map { kv =>
       kv.head -> kv.last
     }.toMap
 
@@ -59,4 +60,24 @@ class SinkTest extends BaseSparkTest {
     val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
     s2Edges.foreach { edge => println(edge) }
   }
+
+  test("S2graphSink writeBatchWithMutate") {
+    initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
+
+    val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":20}"
+    val df = toDataFrame(Seq(bulkEdgeString))
+    val args = Map("writeMethod" -> "mutate") ++
+      options.toCommand.grouped(2).map { kv =>
+        kv.head -> kv.last
+      }.toMap
+
+    val conf = TaskConf("test", "sql", Seq("input"), args)
+
+    val sink = new S2graphSink("testQuery", conf)
+    sink.write(df)
+
+    val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
+    s2Edges.foreach { edge => println(edge) }
+  }
+
 }