You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/05 05:43:11 UTC

[1/4] incubator-s2graph git commit: add skipError option to skip over not-serializable data.

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 1799ae456 -> 5a0e4d835


add skipError option to skip over not-serializable data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/63dd6fa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/63dd6fa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/63dd6fa2

Branch: refs/heads/master
Commit: 63dd6fa23803f1a76a86f8e53c6115c4dd15cbf9
Parents: 3332f6b
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Apr 2 13:57:15 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Apr 2 13:57:15 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   | 28 +++---
 .../s2jobs/loader/GraphFileOptions.scala        | 14 +--
 .../s2graph/s2jobs/loader/HFileGenerator.scala  |  1 +
 .../s2jobs/loader/HFileMRGenerator.scala        |  1 +
 .../loader/LocalBulkLoaderTransformer.scala     | 61 -------------
 .../loader/SparkBulkLoaderTransformer.scala     | 76 ----------------
 .../serde/LocalBulkLoaderTransformer.scala      | 61 +++++++++++++
 .../serde/SparkBulkLoaderTransformer.scala      | 76 ++++++++++++++++
 .../s2jobs/serde/writer/KeyValueWriter.scala    |  5 +-
 .../s2jobs/loader/GraphFileGeneratorTest.scala  | 95 ++++++++++----------
 10 files changed, 214 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 3f80e8f..383f39f 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -24,6 +24,7 @@ import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.SKeyValue
 import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
 import play.api.libs.json.Json
 
 import scala.concurrent.ExecutionContext
@@ -54,8 +55,8 @@ object S2GraphHelper {
     }
   }
 
-  private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): Seq[SKeyValue] = {
-    val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
+  private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, option: GraphFileOptions): Seq[SKeyValue] = {
+    val relEdges = if (option.autoEdgeCreate) edge.relatedEdges else List(edge)
 
     val snapshotEdgeKeyValues = s2.getStorage(edge.toSnapshotEdge.label).serDe.snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues
     val indexEdgeKeyValues = relEdges.flatMap { edge =>
@@ -67,15 +68,20 @@ object S2GraphHelper {
     snapshotEdgeKeyValues ++ indexEdgeKeyValues
   }
 
-  def toSKeyValues(s2: S2Graph, element: GraphElement, autoEdgeCreate: Boolean = false): Seq[SKeyValue] = {
-    if (element.isInstanceOf[S2Edge]) {
-      val edge = element.asInstanceOf[S2Edge]
-      insertBulkForLoaderAsync(s2, edge, autoEdgeCreate)
-    } else if (element.isInstanceOf[S2Vertex]) {
-      val vertex = element.asInstanceOf[S2Vertex]
-      s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues
-    } else {
-      Nil
+  def toSKeyValues(s2: S2Graph, element: GraphElement, option: GraphFileOptions): Seq[SKeyValue] = {
+    try {
+      if (element.isInstanceOf[S2Edge]) {
+        val edge = element.asInstanceOf[S2Edge]
+        insertBulkForLoaderAsync(s2, edge, option)
+      } else if (element.isInstanceOf[S2Vertex]) {
+        val vertex = element.asInstanceOf[S2Vertex]
+        s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues
+      } else {
+        Nil
+      }
+    } catch {
+      case e: Exception =>
+        if (option.skipError) Nil else throw e
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
index 3e3ffb9..e855a32 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
@@ -50,12 +50,10 @@ object GraphFileOptions {
       c.copy(dbDriver = x)).text("jdbc driver class.")
 
     opt[Int]('h', "maxHFilePerRegionServer").action ( (x, c) =>
-      c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per RegionServer."
-    )
+      c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per RegionServer.")
 
     opt[Int]('n', "numRegions").action ( (x, c) =>
-      c.copy(numRegions = x)).text("total numRegions(pre-split size) on table."
-    )
+      c.copy(numRegions = x)).text("total numRegions(pre-split size) on table.")
 
     opt[String]('l', "labelMapping").action( (x, c) =>
       c.copy(labelMapping = toLabelMapping(x)) ).text("mapping info to change the label from source (originalLabel:newLabel)")
@@ -67,8 +65,11 @@ object GraphFileOptions {
       c.copy(autoEdgeCreate = x)).text("generate reverse edge automatically")
 
     opt[Boolean]('c', "incrementalLoad").action( (x, c) =>
-      c.copy(incrementalLoad = x)).text("whether incremental bulkload which append data on existing table or not."
-    )
+      c.copy(incrementalLoad = x)).text("whether incremental bulkload which append data on existing table or not.")
+
+    opt[Boolean]('s', "skipError").action ((x, c) =>
+      c.copy(skipError = x)).text("whether skip error row.")
+
     opt[String]('m', "method").action( (x, c) =>
       c.copy(method = x)).text("run method. currently MR(default)/SPARK supported."
     )
@@ -124,6 +125,7 @@ case class GraphFileOptions(input: String = "",
                             autoEdgeCreate: Boolean = false,
                             buildDegree: Boolean = false,
                             incrementalLoad: Boolean = false,
+                            skipError: Boolean = false,
                             compressionAlgorithm: String = "NONE",
                             method: String = "SPARK") {
   def toConfigParams = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index b4ac51f..8ace94a 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, TableName}
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
+import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer
 import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
index 3502bee..fd78718 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, SequenceFileInputFormat}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat}
 import org.apache.hadoop.mapreduce.{Job, Mapper}
+import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
deleted file mode 100644
index 7d405a6..0000000
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.loader
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.KeyValue
-import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.serde.Transformer
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
-
-import scala.concurrent.ExecutionContext
-
-class LocalBulkLoaderTransformer(val config: Config,
-                                 val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] {
-  val s2: S2Graph = S2GraphHelper.initS2Graph(config)
-
-  override val reader = new TsvBulkFormatReader
-  override val writer = new KeyValueWriter
-
-  override def read(input: Seq[String]): Seq[GraphElement] = input.flatMap(reader.read(s2)(_))
-
-  override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = elements.map(writer.write(s2)(_))
-
-  override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = {
-    val degrees = elements.flatMap { element =>
-      DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
-    }.groupBy(_._1).mapValues(_.map(_._2).sum)
-
-    degrees.toSeq.map { case (degreeKey, count) =>
-      DegreeKey.toKeyValue(s2, degreeKey, count)
-    }
-  }
-
-  override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = {
-    val elements = read(input)
-    val kvs = write(elements)
-
-    val degrees = if (options.buildDegree) buildDegrees(elements) else Nil
-
-    kvs ++ degrees
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
deleted file mode 100644
index cd991e1..0000000
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.loader
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
-import org.apache.s2graph.core.GraphElement
-import org.apache.s2graph.s2jobs.serde.Transformer
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
-import org.apache.spark.rdd.RDD
-
-class SparkBulkLoaderTransformer(val config: Config,
-                                 val options: GraphFileOptions) extends Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
-  val reader = new TsvBulkFormatReader
-
-  val writer = new KeyValueWriter
-
-  override def read(input: RDD[String]): RDD[GraphElement] = input.mapPartitions { iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
-
-    iter.flatMap { line =>
-      reader.read(s2)(line)
-    }
-  }
-
-  override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
-
-    iter.map(writer.write(s2)(_))
-  }
-
-  override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = {
-    val degrees = elements.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
-
-      iter.flatMap { element =>
-        DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
-      }
-    }.reduceByKey(_ + _)
-
-    degrees.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
-
-      iter.map { case (degreeKey, count) =>
-        DegreeKey.toKeyValue(s2, degreeKey, count)
-      }
-    }
-  }
-
-  override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = {
-    val elements = read(input)
-    val kvs = write(elements)
-
-    if (options.buildDegree) kvs ++ buildDegrees(elements)
-    kvs
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
new file mode 100644
index 0000000..a185754
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.KeyValue
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+
+import scala.concurrent.ExecutionContext
+
+class LocalBulkLoaderTransformer(val config: Config,
+                                 val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] {
+  val s2: S2Graph = S2GraphHelper.initS2Graph(config)
+
+  override val reader = new TsvBulkFormatReader
+  override val writer = new KeyValueWriter(options)
+
+  override def read(input: Seq[String]): Seq[GraphElement] = input.flatMap(reader.read(s2)(_))
+
+  override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = elements.map(writer.write(s2)(_))
+
+  override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = {
+    val degrees = elements.flatMap { element =>
+      DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
+    }.groupBy(_._1).mapValues(_.map(_._2).sum)
+
+    degrees.toSeq.map { case (degreeKey, count) =>
+      DegreeKey.toKeyValue(s2, degreeKey, count)
+    }
+  }
+
+  override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = {
+    val elements = read(input)
+    val kvs = write(elements)
+
+    val degrees = if (options.buildDegree) buildDegrees(elements) else Nil
+
+    kvs ++ degrees
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
new file mode 100644
index 0000000..63f4e2c
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
+import org.apache.s2graph.core.GraphElement
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.spark.rdd.RDD
+
+class SparkBulkLoaderTransformer(val config: Config,
+                                 val options: GraphFileOptions) extends Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
+  val reader = new TsvBulkFormatReader
+
+  val writer = new KeyValueWriter(options)
+
+  override def read(input: RDD[String]): RDD[GraphElement] = input.mapPartitions { iter =>
+    val s2 = S2GraphHelper.initS2Graph(config)
+
+    iter.flatMap { line =>
+      reader.read(s2)(line)
+    }
+  }
+
+  override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter =>
+    val s2 = S2GraphHelper.initS2Graph(config)
+
+    iter.map(writer.write(s2)(_))
+  }
+
+  override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = {
+    val degrees = elements.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.flatMap { element =>
+        DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
+      }
+    }.reduceByKey(_ + _)
+
+    degrees.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.map { case (degreeKey, count) =>
+        DegreeKey.toKeyValue(s2, degreeKey, count)
+      }
+    }
+  }
+
+  override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = {
+    val elements = read(input)
+    val kvs = write(elements)
+
+    if (options.buildDegree) kvs ++ buildDegrees(elements)
+    kvs
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
index 02034af..22eee34 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
@@ -22,11 +22,12 @@ package org.apache.s2graph.s2jobs.serde.writer
 import org.apache.hadoop.hbase.KeyValue
 import org.apache.s2graph.core.{GraphElement, S2Graph}
 import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
 import org.apache.s2graph.s2jobs.serde.GraphElementWritable
 
-class KeyValueWriter(autoEdgeCreate: Boolean = false) extends GraphElementWritable[Seq[KeyValue]] {
+class KeyValueWriter(option: GraphFileOptions) extends GraphElementWritable[Seq[KeyValue]] {
   override def write(s2: S2Graph)(element: GraphElement): Seq[KeyValue] = {
-    S2GraphHelper.toSKeyValues(s2, element, autoEdgeCreate).map { skv =>
+    S2GraphHelper.toSKeyValues(s2, element, option).map { skv =>
       new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index 3fbbd88..3bd1a23 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -19,20 +19,18 @@
 
 package org.apache.s2graph.s2jobs.loader
 
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
-import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.{PostProcess, S2VertexLike}
+import org.apache.s2graph.core.PostProcess
 import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue}
 import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.s2graph.s2jobs.serde.{LocalBulkLoaderTransformer, SparkBulkLoaderTransformer}
 import play.api.libs.json.Json
 
-import scala.io.Source
-
 class GraphFileGeneratorTest extends BaseSparkTest {
-  import scala.concurrent.ExecutionContext.Implicits.global
+
   import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
 
+  import scala.concurrent.ExecutionContext.Implicits.global
+
   def transformToSKeyValues(transformerMode: String, edges: Seq[String]): List[SKeyValue] = {
     transformerMode match {
       case "spark" =>
@@ -56,6 +54,7 @@ class GraphFileGeneratorTest extends BaseSparkTest {
         }.toList
     }
   }
+
   test("test generateKeyValues edge only. SparkBulkLoaderTransformer") {
     val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
     /* end of initialize model */
@@ -121,7 +120,7 @@ class GraphFileGeneratorTest extends BaseSparkTest {
       println(Json.prettyPrint(jsValue))
     }
 
-    bulkVertex shouldBe(vertex)
+    bulkVertex shouldBe (vertex)
   }
 
   test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") {
@@ -140,46 +139,46 @@ class GraphFileGeneratorTest extends BaseSparkTest {
       println(Json.prettyPrint(jsValue))
     }
 
-    bulkVertex shouldBe(vertex)
+    bulkVertex shouldBe (vertex)
   }
 
-//   this test case expect options.input already exist with valid bulk load format.
-//  test("bulk load and fetch vertex: spark mode") {
-//    import scala.collection.JavaConverters._
-//    val serviceColumn = initTestVertexSchema(s2)
-//
-//    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-//    val input = sc.parallelize(bulkVertexLs)
-//
-//    HFileGenerator.generate(sc, s2Config, input, options)
-//
-//    val hfileArgs = Array(options.output, options.tableName)
-//    val hbaseConfig = HBaseConfiguration.create()
-//
-//    val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-//
-//    val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-//    val json = PostProcess.verticesToJson(s2Vertices)
-//
-//    println(Json.prettyPrint(json))
-//  }
-
-//   this test case expect options.input already exist with valid bulk load format.
-//  test("bulk load and fetch vertex: mr mode") {
-//    val serviceColumn = initTestVertexSchema(s2)
-//
-//    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-//    val input = sc.parallelize(bulkVertexLs)
-//
-//    HFileMRGenerator.generate(sc, s2Config, input, options)
-//
-//    val hfileArgs = Array(options.output, options.tableName)
-//    val hbaseConfig = HBaseConfiguration.create()
-//
-//    val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-//    val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-//    val json = PostProcess.verticesToJson(s2Vertices)
-//
-//    println(Json.prettyPrint(json))
-//  }
+  //   this test case expect options.input already exist with valid bulk load format.
+  //  test("bulk load and fetch vertex: spark mode") {
+  //    import scala.collection.JavaConverters._
+  //    val serviceColumn = initTestVertexSchema(s2)
+  //
+  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+  //    val input = sc.parallelize(bulkVertexLs)
+  //
+  //    HFileGenerator.generate(sc, s2Config, input, options)
+  //
+  //    val hfileArgs = Array(options.output, options.tableName)
+  //    val hbaseConfig = HBaseConfiguration.create()
+  //
+  //    val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+  //
+  //    val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+  //    val json = PostProcess.verticesToJson(s2Vertices)
+  //
+  //    println(Json.prettyPrint(json))
+  //  }
+
+  //   this test case expect options.input already exist with valid bulk load format.
+  //  test("bulk load and fetch vertex: mr mode") {
+  //    val serviceColumn = initTestVertexSchema(s2)
+  //
+  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+  //    val input = sc.parallelize(bulkVertexLs)
+  //
+  //    HFileMRGenerator.generate(sc, s2Config, input, options)
+  //
+  //    val hfileArgs = Array(options.output, options.tableName)
+  //    val hbaseConfig = HBaseConfiguration.create()
+  //
+  //    val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+  //    val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+  //    val json = PostProcess.verticesToJson(s2Vertices)
+  //
+  //    println(Json.prettyPrint(json))
+  //  }
 }


[3/4] incubator-s2graph git commit: Merge branch 'master' into S2GRAPH-198

Posted by st...@apache.org.
Merge branch 'master' into S2GRAPH-198


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/acf5ccf4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/acf5ccf4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/acf5ccf4

Branch: refs/heads/master
Commit: acf5ccf4b806d8f9699802201b98c9bab4dbff2e
Parents: 1b8a13c 1799ae4
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Apr 5 13:30:17 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Apr 5 13:32:33 2018 +0900

----------------------------------------------------------------------
 CHANGES                                         |  1 +
 .../org/apache/s2graph/s2jobs/task/Sink.scala   | 10 +-
 .../s2jobs/dump/GraphFileDumperTest.scala       | 97 --------------------
 .../apache/s2graph/s2jobs/task/SinkTest.scala   | 10 +-
 4 files changed, 13 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/acf5ccf4/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------


[4/4] incubator-s2graph git commit: [S2GRAPH-198]: Skip json Decode error and continue

Posted by st...@apache.org.
[S2GRAPH-198]: Skip json Decode error and continue

JIRA:
    [S2GRAPH-198] https://issues.apache.org/jira/browse/S2GRAPH-198

Pull Request:
    Closes #150

Author
    DO YUNG YOON <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/5a0e4d83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/5a0e4d83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/5a0e4d83

Branch: refs/heads/master
Commit: 5a0e4d83575beab279bd98181ff852b5f01dd7c3
Parents: acf5ccf
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Apr 5 14:42:39 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Apr 5 14:42:39 2018 +0900

----------------------------------------------------------------------
 CHANGES | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5a0e4d83/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 077f641..775ccfa 100644
--- a/CHANGES
+++ b/CHANGES
@@ -65,6 +65,7 @@ Release Notes - S2Graph - Version 0.2.0
     * [S2GRAPH-169] - Separate multiple functionalities on Storage class into multiple Interface.
     * [S2GRAPH-170] - Create Interface for S2Edge/S2Vertex/S2Graph.
     * [S2GRAPH-182] - Version up spark dependencies.
+    * [S2GRAPH-198] - Skip json Decode error and continue.
 
 ** New Feature
     * [S2GRAPH-123] - Support different index on out/in direction.


[2/4] incubator-s2graph git commit: Merge upstream master.

Posted by st...@apache.org.
Merge upstream master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/1b8a13c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/1b8a13c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/1b8a13c3

Branch: refs/heads/master
Commit: 1b8a13c315eb054efa40f25d6dddde85ce2672af
Parents: 63dd6fa
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Apr 5 13:22:13 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Apr 5 13:22:13 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   |  77 ++++++++++---
 .../s2jobs/loader/GraphFileOptions.scala        |  31 +++++
 .../s2graph/s2jobs/loader/HFileGenerator.scala  |  23 +++-
 .../s2jobs/loader/HFileMRGenerator.scala        |  10 +-
 .../loader/LocalBulkLoaderTransformer.scala     |  51 +++++++++
 .../loader/SparkBulkLoaderTransformer.scala     |  69 +++++++++++
 .../s2jobs/serde/GraphElementWritable.scala     |   4 +
 .../serde/LocalBulkLoaderTransformer.scala      |  61 ----------
 .../serde/SparkBulkLoaderTransformer.scala      |  76 -------------
 .../s2graph/s2jobs/serde/Transformer.scala      |  21 +---
 .../serde/reader/RowBulkFormatReader.scala      |  33 ++++++
 .../s2jobs/serde/writer/KeyValueWriter.scala    |  24 +++-
 .../org/apache/s2graph/s2jobs/task/Sink.scala   | 111 +++++++++++++++---
 .../org/apache/s2graph/s2jobs/task/Task.scala   |   9 ++
 .../sql/streaming/S2StreamQueryWriter.scala     |  50 +-------
 .../apache/s2graph/s2jobs/BaseSparkTest.scala   |  15 +--
 .../s2graph/s2jobs/S2GraphHelperTest.scala      |  13 ++-
 .../s2jobs/loader/GraphFileGeneratorTest.scala  | 113 ++++++++++++-------
 .../apache/s2graph/s2jobs/task/SinkTest.scala   |  98 ++++++++++++++++
 19 files changed, 591 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 383f39f..6e68d28 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -24,10 +24,12 @@ import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.SKeyValue
 import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
-import play.api.libs.json.Json
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.StructType
+import play.api.libs.json.{JsObject, Json}
 
 import scala.concurrent.ExecutionContext
+import scala.util.Try
 
 object S2GraphHelper {
   def initS2Graph(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = {
@@ -55,8 +57,8 @@ object S2GraphHelper {
     }
   }
 
-  private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, option: GraphFileOptions): Seq[SKeyValue] = {
-    val relEdges = if (option.autoEdgeCreate) edge.relatedEdges else List(edge)
+  private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): Seq[SKeyValue] = {
+    val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
 
     val snapshotEdgeKeyValues = s2.getStorage(edge.toSnapshotEdge.label).serDe.snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues
     val indexEdgeKeyValues = relEdges.flatMap { edge =>
@@ -68,20 +70,59 @@ object S2GraphHelper {
     snapshotEdgeKeyValues ++ indexEdgeKeyValues
   }
 
-  def toSKeyValues(s2: S2Graph, element: GraphElement, option: GraphFileOptions): Seq[SKeyValue] = {
-    try {
-      if (element.isInstanceOf[S2Edge]) {
-        val edge = element.asInstanceOf[S2Edge]
-        insertBulkForLoaderAsync(s2, edge, option)
-      } else if (element.isInstanceOf[S2Vertex]) {
-        val vertex = element.asInstanceOf[S2Vertex]
-        s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues
-      } else {
-        Nil
-      }
-    } catch {
-      case e: Exception =>
-        if (option.skipError) Nil else throw e
+  def toSKeyValues(s2: S2Graph, element: GraphElement, autoEdgeCreate: Boolean = false): Seq[SKeyValue] = {
+    if (element.isInstanceOf[S2Edge]) {
+      val edge = element.asInstanceOf[S2Edge]
+      insertBulkForLoaderAsync(s2, edge, autoEdgeCreate)
+    } else if (element.isInstanceOf[S2Vertex]) {
+      val vertex = element.asInstanceOf[S2Vertex]
+      s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues
+    } else {
+      Nil
+    }
+  }
+
+  def sparkSqlRowToGraphElement(s2: S2Graph, row: Row, schema: StructType, reservedColumn: Set[String]): Option[GraphElement] = {
+    val timestamp = row.getAs[Long]("timestamp")
+    val operation = Try(row.getAs[String]("operation")).getOrElse("insert")
+    val elem = Try(row.getAs[String]("elem")).getOrElse("e")
+
+    val props: Map[String, Any] = Option(row.getAs[String]("props")) match {
+      case Some(propsStr:String) =>
+        JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject])
+      case None =>
+        schema.fieldNames.flatMap { field =>
+          if (!reservedColumn.contains(field)) {
+            Seq(
+              field -> getRowValAny(row, field)
+            )
+          } else Nil
+        }.toMap
     }
+
+    elem match {
+      case "e" | "edge" =>
+        val from = getRowValAny(row, "from")
+        val to = getRowValAny(row, "to")
+        val label = row.getAs[String]("label")
+        val direction = Try(row.getAs[String]("direction")).getOrElse("out")
+        Some(
+          s2.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation)
+        )
+      case "v" | "vertex" =>
+        val id = getRowValAny(row, "id")
+        val serviceName = row.getAs[String]("service")
+        val columnName = row.getAs[String]("column")
+        Some(
+          s2.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation)
+        )
+      case _ =>
+        None
+    }
+  }
+
+  private def getRowValAny(row:Row, fieldName:String):Any = {
+    val idx = row.fieldIndex(fieldName)
+    row.get(idx)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
index e855a32..f62189a 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
@@ -20,6 +20,12 @@
 package org.apache.s2graph.s2jobs.loader
 
 object GraphFileOptions {
+  val OptionKeys = Set(
+    "--input", "--tempDir", "--output", "--zkQuorum", "--table", "--dbUrl", "--dbUser", "--dbPassword", "--dbDriver",
+    "--maxHFilePerRegionServer", "--numRegions", "--labelMapping", "--autoEdgeCreate", "--buildDegree",
+    "--skipError", "--incrementalLoad", "--method"
+  )
+
   val parser = new scopt.OptionParser[GraphFileOptions]("run") {
 
     opt[String]('i', "input").required().action( (x, c) =>
@@ -92,6 +98,10 @@ object GraphFileOptions {
       (inner.head, inner.last)
     }).toMap
   }
+
+  def toLabelMappingString(labelMapping: Map[String, String]): String =
+    labelMapping.map { case (k, v) => Seq(k, v).mkString(":") }.mkString(",")
+
 }
 /**
   * Option case class for TransferToHFile.
@@ -137,4 +147,25 @@ case class GraphFileOptions(input: String = "",
       "db.default.driver" -> dbDriver
     )
   }
+
+  def toCommand: Array[String] =
+    Array(
+      "--input", input,
+      "--tempDir", tempDir,
+      "--output", output,
+      "--zkQuorum", zkQuorum,
+      "--table", tableName,
+      "--dbUrl", dbUrl,
+      "--dbUser", dbUser,
+      "--dbPassword", dbPassword,
+      "--dbDriver", dbDriver,
+      "--maxHFilePerRegionServer", maxHFilePerRegionServer.toString,
+      "--numRegions", numRegions.toString,
+      "--labelMapping", GraphFileOptions.toLabelMappingString(labelMapping),
+      "--autoEdgeCreate", autoEdgeCreate.toString,
+      "--buildDegree", buildDegree.toString,
+      "--skipError", skipError.toString,
+      "--incrementalLoad", incrementalLoad.toString,
+      "--method", method
+    )
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index 8ace94a..2b230c9 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -24,12 +24,14 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.client.ConnectionFactory
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
+import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableOutputFormat}
 import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, TableName}
+import org.apache.hadoop.util.ToolRunner
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
-import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
@@ -112,11 +114,22 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] {
   override def generate(sc: SparkContext,
                         config: Config,
                         rdd: RDD[String],
-                        _options: GraphFileOptions): Unit = {
-    val transformer = new SparkBulkLoaderTransformer(config, _options)
+                        options: GraphFileOptions): Unit = {
+    val transformer = new SparkBulkLoaderTransformer(config, options)
+
+    implicit val reader = new TsvBulkFormatReader
+    implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError)
+
     val kvs = transformer.transform(rdd).flatMap(kvs => kvs)
 
-    HFileGenerator.generateHFile(sc, config, kvs, _options)
+    HFileGenerator.generateHFile(sc, config, kvs, options)
+  }
+
+  def loadIncrementalHFiles(options: GraphFileOptions): Int = {
+    /* LoadIncrementHFiles */
+    val hfileArgs = Array(options.output, options.tableName)
+    val hbaseConfig = HBaseConfiguration.create()
+    ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
index fd78718..9a2e81a 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
@@ -19,8 +19,6 @@
 
 package org.apache.s2graph.s2jobs.loader
 
-import java.util.UUID
-
 import com.typesafe.config.Config
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -30,11 +28,11 @@ import org.apache.hadoop.hbase.io.compress.Compression
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
 import org.apache.hadoop.hbase.mapreduce.{GraphHFileOutputFormat, HFileOutputFormat2}
 import org.apache.hadoop.hbase.regionserver.BloomType
-import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, SequenceFileInputFormat}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat}
 import org.apache.hadoop.mapreduce.{Job, Mapper}
-import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 
@@ -106,6 +104,10 @@ object HFileMRGenerator extends RawFileGenerator[String, KeyValue] {
                input: RDD[String],
                options: GraphFileOptions): RDD[KeyValue] = {
     val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+
+    implicit val reader = new TsvBulkFormatReader
+    implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError)
+
     transformer.transform(input).flatMap(kvs => kvs)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
new file mode 100644
index 0000000..d3ed6bc
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.loader
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer}
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+
+import scala.concurrent.ExecutionContext
+import scala.reflect.ClassTag
+
+class LocalBulkLoaderTransformer(val config: Config,
+                                 val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[Seq] {
+  val s2: S2Graph = S2GraphHelper.initS2Graph(config)
+
+  override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit writer: GraphElementWritable[T]): Seq[T] = {
+    val degrees = elements.flatMap { element =>
+      DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
+    }.groupBy(_._1).mapValues(_.map(_._2).sum)
+
+    degrees.toSeq.map { case (degreeKey, count) =>
+      writer.writeDegree(s2)(degreeKey, count)
+    }
+  }
+
+  override def transform[S: ClassTag, T: ClassTag](input: Seq[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): Seq[T] = {
+    val elements = input.flatMap(reader.read(s2)(_))
+    val kvs = elements.map(writer.write(s2)(_))
+    val degrees = if (options.buildDegree) buildDegrees[T](elements) else Nil
+
+    kvs ++ degrees
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
new file mode 100644
index 0000000..eec69b9
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.loader
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.GraphElement
+import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer}
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+class SparkBulkLoaderTransformer(val config: Config,
+                                 val options: GraphFileOptions) extends Transformer[RDD] {
+
+  override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = {
+    val degrees = elements.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.flatMap { element =>
+        DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
+      }
+    }.reduceByKey(_ + _)
+
+    degrees.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.map { case (degreeKey, count) =>
+        writer.writeDegree(s2)(degreeKey, count)
+      }
+    }
+  }
+
+  override def transform[S: ClassTag, T: ClassTag](input: RDD[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): RDD[T] = {
+    val elements = input.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.flatMap { line =>
+        reader.read(s2)(line)
+      }
+    }
+
+    val kvs = elements.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.map(writer.write(s2)(_))
+    }
+
+    if (options.buildDegree) kvs ++ buildDegrees(elements)
+    else kvs
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
index ae082d8..f71a9e8 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
@@ -20,7 +20,11 @@
 package org.apache.s2graph.s2jobs.serde
 
 import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.DegreeKey
 
 trait GraphElementWritable[T] extends Serializable {
+
   def write(s2: S2Graph)(element: GraphElement): T
+
+  def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): T
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
deleted file mode 100644
index a185754..0000000
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.serde
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.KeyValue
-import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
-
-import scala.concurrent.ExecutionContext
-
-class LocalBulkLoaderTransformer(val config: Config,
-                                 val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] {
-  val s2: S2Graph = S2GraphHelper.initS2Graph(config)
-
-  override val reader = new TsvBulkFormatReader
-  override val writer = new KeyValueWriter(options)
-
-  override def read(input: Seq[String]): Seq[GraphElement] = input.flatMap(reader.read(s2)(_))
-
-  override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = elements.map(writer.write(s2)(_))
-
-  override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = {
-    val degrees = elements.flatMap { element =>
-      DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
-    }.groupBy(_._1).mapValues(_.map(_._2).sum)
-
-    degrees.toSeq.map { case (degreeKey, count) =>
-      DegreeKey.toKeyValue(s2, degreeKey, count)
-    }
-  }
-
-  override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = {
-    val elements = read(input)
-    val kvs = write(elements)
-
-    val degrees = if (options.buildDegree) buildDegrees(elements) else Nil
-
-    kvs ++ degrees
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
deleted file mode 100644
index 63f4e2c..0000000
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.serde
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
-import org.apache.s2graph.core.GraphElement
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
-import org.apache.spark.rdd.RDD
-
-class SparkBulkLoaderTransformer(val config: Config,
-                                 val options: GraphFileOptions) extends Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
-  val reader = new TsvBulkFormatReader
-
-  val writer = new KeyValueWriter(options)
-
-  override def read(input: RDD[String]): RDD[GraphElement] = input.mapPartitions { iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
-
-    iter.flatMap { line =>
-      reader.read(s2)(line)
-    }
-  }
-
-  override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
-
-    iter.map(writer.write(s2)(_))
-  }
-
-  override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = {
-    val degrees = elements.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
-
-      iter.flatMap { element =>
-        DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
-      }
-    }.reduceByKey(_ + _)
-
-    degrees.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
-
-      iter.map { case (degreeKey, count) =>
-        DegreeKey.toKeyValue(s2, degreeKey, count)
-      }
-    }
-  }
-
-  override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = {
-    val elements = read(input)
-    val kvs = write(elements)
-
-    if (options.buildDegree) kvs ++ buildDegrees(elements)
-    kvs
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
index 3902c63..99afa25 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
@@ -21,30 +21,21 @@ package org.apache.s2graph.s2jobs.serde
 
 import com.typesafe.config.Config
 import org.apache.s2graph.core.GraphElement
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+
+import scala.reflect.ClassTag
 
 /**
   * Define serialize/deserialize.
   * Source -> GraphElement
   * GraphElement -> Target
   *
-  * @tparam S : Source class. ex) String, RDF.Statement, ...
-  * @tparam T : Target class. ex) KeyValue, Array[Byte], String, ...
   * @tparam M : Container type. ex) RDD, Seq, List, ...
   */
-trait Transformer[S, T, M[_]] extends Serializable {
+trait Transformer[M[_]] extends Serializable {
   val config: Config
-  val options: GraphFileOptions
-
-  val reader: GraphElementReadable[S]
-
-  val writer: GraphElementWritable[T]
-
-  def read(input: M[S]): M[GraphElement]
-
-  def write(elements: M[GraphElement]): M[T]
 
-  def buildDegrees(elements: M[GraphElement]): M[T]
+  def buildDegrees[T: ClassTag](elements: M[GraphElement])(implicit writer: GraphElementWritable[T]): M[T]
 
-  def transform(input: M[S]): M[T]
+  def transform[S: ClassTag, T: ClassTag](input: M[S])
+                           (implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): M[T]
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
new file mode 100644
index 0000000..2a15011
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde.reader
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.serde.GraphElementReadable
+import org.apache.spark.sql.Row
+
+class RowBulkFormatReader extends GraphElementReadable[Row] {
+  private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction")
+
+  override def read(s2: S2Graph)(row: Row): Option[GraphElement] =
+    S2GraphHelper.sparkSqlRowToGraphElement(s2, row, row.schema, RESERVED_COLUMN)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
index 22eee34..04c1b1d 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
@@ -21,14 +21,28 @@ package org.apache.s2graph.s2jobs.serde.writer
 
 import org.apache.hadoop.hbase.KeyValue
 import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
 import org.apache.s2graph.s2jobs.serde.GraphElementWritable
 
-class KeyValueWriter(option: GraphFileOptions) extends GraphElementWritable[Seq[KeyValue]] {
+class KeyValueWriter(autoEdgeCreate: Boolean = false,
+                     skipError: Boolean = false) extends GraphElementWritable[Seq[KeyValue]] {
   override def write(s2: S2Graph)(element: GraphElement): Seq[KeyValue] = {
-    S2GraphHelper.toSKeyValues(s2, element, option).map { skv =>
-      new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value)
+    try {
+      S2GraphHelper.toSKeyValues(s2, element, autoEdgeCreate).map { skv =>
+        new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value)
+      }
+    } catch {
+      case e: Exception =>
+        if (skipError) Nil else throw e
+    }
+  }
+
+  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): Seq[KeyValue] = {
+    try {
+      DegreeKey.toKeyValue(s2, degreeKey, count)
+    } catch {
+      case e: Exception =>
+        if (skipError) Nil else throw e
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 3d5beb6..ab32fad 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -19,24 +19,34 @@
 
 package org.apache.s2graph.s2jobs.task
 
+import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
+import org.apache.s2graph.core.Management
+import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
+import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
 import org.elasticsearch.spark.sql.EsSparkSQL
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 /**
   * Sink
+  *
   * @param queryName
   * @param conf
   */
-abstract class Sink(queryName:String, override val conf:TaskConf) extends Task {
+abstract class Sink(queryName: String, override val conf: TaskConf) extends Task {
   val DEFAULT_CHECKPOINT_LOCATION = s"/tmp/streamingjob/${queryName}/${conf.name}"
   val DEFAULT_TRIGGER_INTERVAL = "10 seconds"
 
-  val FORMAT:String
+  val FORMAT: String
 
-  def preprocess(df:DataFrame):DataFrame = df
+  def preprocess(df: DataFrame): DataFrame = df
 
-  def write(inputDF: DataFrame):Unit = {
+  def write(inputDF: DataFrame): Unit = {
     val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
 
     if (inputDF.isStreaming) writeStream(df.writeStream)
@@ -50,7 +60,7 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task {
       case "update" => OutputMode.Update()
       case "complete" => OutputMode.Complete()
       case _ => logger.warn(s"${LOG_PREFIX} unsupported output mode. use default output mode 'append'")
-                OutputMode.Append()
+        OutputMode.Append()
     }
     val interval = conf.options.getOrElse("interval", DEFAULT_TRIGGER_INTERVAL)
     val checkpointLocation = conf.options.getOrElse("checkpointLocation", DEFAULT_CHECKPOINT_LOCATION)
@@ -88,9 +98,9 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task {
     writer.save(outputPath)
   }
 
-  protected def repartition(df:DataFrame, defaultParallelism:Int) = {
+  protected def repartition(df: DataFrame, defaultParallelism: Int) = {
     conf.options.get("numPartitions").map(n => Integer.parseInt(n)) match {
-      case Some(numOfPartitions:Int) =>
+      case Some(numOfPartitions: Int) =>
         if (numOfPartitions > defaultParallelism) df.repartition(numOfPartitions)
         else df.coalesce(numOfPartitions)
       case None => df
@@ -100,14 +110,16 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task {
 
 /**
   * KafkaSink
+  *
   * @param queryName
   * @param conf
   */
-class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class KafkaSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
   override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "topic")
+
   override val FORMAT: String = "kafka"
 
-  override def preprocess(df:DataFrame):DataFrame = {
+  override def preprocess(df: DataFrame): DataFrame = {
     import org.apache.spark.sql.functions._
 
     logger.debug(s"${LOG_PREFIX} schema: ${df.schema}")
@@ -118,7 +130,7 @@ class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
 
         val columns = df.columns
         df.select(concat_ws(delimiter, columns.map(c => col(c)): _*).alias("value"))
-      case format:String =>
+      case format: String =>
         if (format != "json") logger.warn(s"${LOG_PREFIX} unsupported format '$format'. use default json format")
         df.selectExpr("to_json(struct(*)) AS value")
     }
@@ -130,21 +142,25 @@ class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
 
 /**
   * FileSink
+  *
   * @param queryName
   * @param conf
   */
-class FileSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class FileSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
   override def mandatoryOptions: Set[String] = Set("path", "format")
+
   override val FORMAT: String = conf.options.getOrElse("format", "parquet")
 }
 
 /**
   * HiveSink
+  *
   * @param queryName
   * @param conf
   */
-class HiveSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class HiveSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
   override def mandatoryOptions: Set[String] = Set("database", "table")
+
   override val FORMAT: String = "hive"
 
   override protected def writeBatchInner(writer: DataFrameWriter[Row]): Unit = {
@@ -161,11 +177,13 @@ class HiveSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
 
 /**
   * ESSink
+  *
   * @param queryName
   * @param conf
   */
-class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class ESSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
   override def mandatoryOptions: Set[String] = Set("es.nodes", "path", "es.port")
+
   override val FORMAT: String = "es"
 
   override def write(inputDF: DataFrame): Unit = {
@@ -182,14 +200,75 @@ class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
 
 /**
   * S2graphSink
+  *
   * @param queryName
   * @param conf
   */
-class S2graphSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
   override def mandatoryOptions: Set[String] = Set()
+
   override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
-  override protected def writeBatch(writer: DataFrameWriter[Row]): Unit =
-    throw new RuntimeException(s"unsupported source type for ${this.getClass.getSimpleName} : ${conf.name}")
+  private def writeBatchBulkload(df: DataFrame, runLoadIncrementalHFiles: Boolean = true): Unit = {
+    val options = TaskConf.toGraphFileOptions(conf)
+    val config = Management.toConfig(options.toConfigParams)
+    val input = df.rdd
+
+    val transformer = new SparkBulkLoaderTransformer(config, options)
+
+    implicit val reader = new RowBulkFormatReader
+    implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError)
+
+    val kvs = transformer.transform(input)
+
+    HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options)
+
+    // finish bulk load by execute LoadIncrementHFile.
+    if (runLoadIncrementalHFiles) HFileGenerator.loadIncrementalHFiles(options)
+  }
+
+  private def writeBatchWithMutate(df:DataFrame):Unit = {
+    import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+    import scala.collection.JavaConversions._
+
+    val graphConfig: Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
+    val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise())
+
+    val reader = new RowBulkFormatReader
+
+    val groupedSize = getConfigString(graphConfig, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt
+    val waitTime = getConfigString(graphConfig, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt
+
+    df.foreachPartition { iters =>
+      val config = ConfigFactory.parseString(serializedConfig)
+      val s2Graph = S2GraphHelper.initS2Graph(config)
+
+      val responses = iters.grouped(groupedSize).flatMap { rows =>
+        val elements = rows.flatMap(row => reader.read(s2Graph)(row))
+
+        val mutateF = s2Graph.mutateElements(elements, true)
+        Await.result(mutateF, Duration(waitTime, "seconds"))
+      }
+
+      val (success, fail) = responses.toSeq.partition(r => r.isSuccess)
+      logger.info(s"success : ${success.size}, fail : ${fail.size}")
+    }
+  }
+
+  override def write(inputDF: DataFrame): Unit = {
+    val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
+
+    if (inputDF.isStreaming) writeStream(df.writeStream)
+    else {
+      conf.options.getOrElse("writeMethod", "mutate") match {
+        case "mutate" => writeBatchWithMutate(df)
+        case "bulk" =>
+          val runLoadIncrementalHFiles = conf.options.getOrElse("runLoadIncrementalHFiles", "true").toBoolean
+          writeBatchBulkload(df, runLoadIncrementalHFiles)
+        case writeMethod:String => throw new IllegalArgumentException(s"unsupported write method '$writeMethod' (valid method: mutate, bulk)")
+      }
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index e8f11e3..ddd56bf 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -20,7 +20,16 @@
 package org.apache.s2graph.s2jobs.task
 
 import org.apache.s2graph.s2jobs.Logger
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
 
+object TaskConf {
+  def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
+    val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys)
+      .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
+
+    GraphFileOptions.toOption(args)
+  }
+}
 case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty)
 
 trait Task extends Serializable with Logger {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
index e15efe8..f6fecd7 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.spark.sql.streaming
 
 import com.typesafe.config.ConfigFactory
 import org.apache.s2graph.core.{GraphElement, JSONParser}
+import org.apache.s2graph.s2jobs.S2GraphHelper
 import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.Row
@@ -83,51 +84,6 @@ private [sql] class S2StreamQueryWriter(
     }
   }
 
-  private def rowToEdge(internalRow:InternalRow): Option[GraphElement] = {
-    val s2Graph = s2SinkContext.getGraph
-    val row = encoder.fromRow(internalRow)
-
-    val timestamp = row.getAs[Long]("timestamp")
-    val operation = Try(row.getAs[String]("operation")).getOrElse("insert")
-    val elem = Try(row.getAs[String]("elem")).getOrElse("e")
-
-    val props: Map[String, Any] = Option(row.getAs[String]("props")) match {
-      case Some(propsStr:String) =>
-        JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject])
-      case None =>
-        schema.fieldNames.flatMap { field =>
-          if (!RESERVED_COLUMN.contains(field)) {
-            Seq(
-              field -> getRowValAny(row, field)
-            )
-          } else Nil
-        }.toMap
-    }
-
-    elem match {
-      case "e" | "edge" =>
-        val from = getRowValAny(row, "from")
-        val to = getRowValAny(row, "to")
-        val label = row.getAs[String]("label")
-        val direction = Try(row.getAs[String]("direction")).getOrElse("out")
-        Some(
-          s2Graph.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation)
-        )
-      case "v" | "vertex" =>
-        val id = getRowValAny(row, "id")
-        val serviceName = row.getAs[String]("service")
-        val columnName = row.getAs[String]("column")
-        Some(
-          s2Graph.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation)
-        )
-      case _ =>
-        logger.warn(s"'$elem' is not GraphElement. skipped!! (${row.toString()})")
-        None
-    }
-  }
-
-  private def getRowValAny(row:Row, fieldName:String):Any = {
-    val idx = row.fieldIndex(fieldName)
-    row.get(idx)
-  }
+  private def rowToEdge(internalRow:InternalRow): Option[GraphElement] =
+    S2GraphHelper.sparkSqlRowToGraphElement(s2SinkContext.getGraph, encoder.fromRow(internalRow), schema, RESERVED_COLUMN)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 78000d4..0461d1e 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.s2jobs
 
 import java.io.{File, PrintWriter}
 
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
 import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
 import org.apache.s2graph.core.{Management, S2Graph}
@@ -31,11 +32,8 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
 import scala.util.Try
 
-class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll {
-  private val master = "local[2]"
-  private val appName = "example-spark"
+class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
 
-  protected var sc: SparkContext = _
   protected val options = GraphFileOptions(
     input = "/tmp/test.txt",
     tempDir = "/tmp/bulkload_tmp",
@@ -65,18 +63,15 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll {
 
   override def beforeAll(): Unit = {
     // initialize spark context.
-    val conf = new SparkConf()
-      .setMaster(master)
-      .setAppName(appName)
-
-    sc = new SparkContext(conf)
+    super.beforeAll()
 
     s2 = S2GraphHelper.initS2Graph(s2Config)
     initTestDataFile
   }
 
   override def afterAll(): Unit = {
-    if (sc != null) sc.stop()
+    super.afterAll()
+
     if (s2 != null) s2.shutdown()
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
index 6e89466..f2b0102 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
@@ -19,6 +19,17 @@
 
 package org.apache.s2graph.s2jobs
 
-class S2GraphHelperTest {
+import org.apache.s2graph.s2jobs.task.TaskConf
 
+class S2GraphHelperTest extends BaseSparkTest {
+  test("toGraphFileOptions") {
+    val args = options.toCommand.grouped(2).map { kv =>
+          kv.head -> kv.last
+      }.toMap ++ Map("db.default.url" -> "jdbc://localhost:3306/mysql")
+
+    println(args)
+    val taskConf = TaskConf("dummy", "sink", Nil, args)
+    val graphFileOptions = S2GraphHelper.toGraphFileOptions(taskConf)
+    println(graphFileOptions)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index 3bd1a23..991897b 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -19,12 +19,16 @@
 
 package org.apache.s2graph.s2jobs.loader
 
-import org.apache.s2graph.core.PostProcess
+import org.apache.s2graph.core.{PostProcess, S2VertexLike}
 import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue}
 import org.apache.s2graph.s2jobs.BaseSparkTest
-import org.apache.s2graph.s2jobs.serde.{LocalBulkLoaderTransformer, SparkBulkLoaderTransformer}
+import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, TsvBulkFormatReader}
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.spark.rdd.RDD
 import play.api.libs.json.Json
 
+import scala.io.Source
+
 class GraphFileGeneratorTest extends BaseSparkTest {
 
   import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
@@ -34,8 +38,12 @@ class GraphFileGeneratorTest extends BaseSparkTest {
   def transformToSKeyValues(transformerMode: String, edges: Seq[String]): List[SKeyValue] = {
     transformerMode match {
       case "spark" =>
-        val input = sc.parallelize(edges)
+        val input: RDD[String] = sc.parallelize(edges)
         val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+
+        implicit val reader = new TsvBulkFormatReader
+        implicit val writer = new KeyValueWriter
+
         val kvs = transformer.transform(input)
         kvs.flatMap { kvs =>
           kvs.map { kv =>
@@ -46,12 +54,43 @@ class GraphFileGeneratorTest extends BaseSparkTest {
       case "local" =>
         val input = edges
         val transformer = new LocalBulkLoaderTransformer(s2Config, options)
+
+        implicit val reader = new TsvBulkFormatReader
+        implicit val writer = new KeyValueWriter
+
         val kvs = transformer.transform(input)
         kvs.flatMap { kvs =>
           kvs.map { kv =>
             CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)
           }
         }.toList
+
+      case "dataset" =>
+        import spark.sqlContext.implicits._
+        val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
+
+        val rows = elements.map { e =>
+          (e.getTs(),
+            e.getOperation(),
+            "e",
+            e.srcVertex.innerIdVal.toString,
+            e.tgtVertex.innerIdVal.toString,
+            e.label(),
+            "{}",
+            e.getDirection())
+        }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction").rdd
+
+        val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+
+        implicit val reader = new RowBulkFormatReader
+        implicit val writer = new KeyValueWriter
+
+        val kvs = transformer.transform(rows)
+        kvs.flatMap { kvs =>
+          kvs.map { kv =>
+            CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)
+          }
+        }.collect().toList
     }
   }
 
@@ -61,7 +100,7 @@ class GraphFileGeneratorTest extends BaseSparkTest {
 
     val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
 
-    val transformerMode = "spark"
+    val transformerMode = "dataset"
     val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
 
     val serDe = s2.defaultStorage.serDe
@@ -143,42 +182,36 @@ class GraphFileGeneratorTest extends BaseSparkTest {
   }
 
   //   this test case expect options.input already exist with valid bulk load format.
-  //  test("bulk load and fetch vertex: spark mode") {
-  //    import scala.collection.JavaConverters._
-  //    val serviceColumn = initTestVertexSchema(s2)
-  //
-  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-  //    val input = sc.parallelize(bulkVertexLs)
-  //
-  //    HFileGenerator.generate(sc, s2Config, input, options)
-  //
-  //    val hfileArgs = Array(options.output, options.tableName)
-  //    val hbaseConfig = HBaseConfiguration.create()
-  //
-  //    val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-  //
-  //    val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-  //    val json = PostProcess.verticesToJson(s2Vertices)
-  //
-  //    println(Json.prettyPrint(json))
-  //  }
+    test("bulk load and fetch vertex: spark mode") {
+      import scala.collection.JavaConverters._
+      val serviceColumn = initTestVertexSchema(s2)
+
+      val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+      val input = sc.parallelize(bulkVertexLs)
+
+      HFileGenerator.generate(sc, s2Config, input, options)
+      HFileGenerator.loadIncrementHFile(options)
+
+      val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+      val json = PostProcess.verticesToJson(s2Vertices)
+
+      println(Json.prettyPrint(json))
+    }
 
   //   this test case expect options.input already exist with valid bulk load format.
-  //  test("bulk load and fetch vertex: mr mode") {
-  //    val serviceColumn = initTestVertexSchema(s2)
-  //
-  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-  //    val input = sc.parallelize(bulkVertexLs)
-  //
-  //    HFileMRGenerator.generate(sc, s2Config, input, options)
-  //
-  //    val hfileArgs = Array(options.output, options.tableName)
-  //    val hbaseConfig = HBaseConfiguration.create()
-  //
-  //    val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-  //    val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-  //    val json = PostProcess.verticesToJson(s2Vertices)
-  //
-  //    println(Json.prettyPrint(json))
-  //  }
+//    test("bulk load and fetch vertex: mr mode") {
+//      import scala.collection.JavaConverters._
+//      val serviceColumn = initTestVertexSchema(s2)
+//
+//      val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+//      val input = sc.parallelize(bulkVertexLs)
+//
+//      HFileMRGenerator.generate(sc, s2Config, input, options)
+//      HFileGenerator.loadIncrementHFile(options)
+//
+//      val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+//      val json = PostProcess.verticesToJson(s2Vertices)
+//
+//      println(Json.prettyPrint(json))
+//    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
new file mode 100644
index 0000000..47d4628
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.task
+
+import org.apache.s2graph.core.S2EdgeLike
+import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.spark.sql.DataFrame
+
+import scala.collection.JavaConverters._
+
+class SinkTest extends BaseSparkTest {
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
+  }
+  def toDataFrame(edges: Seq[String]): DataFrame = {
+    import spark.sqlContext.implicits._
+    val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
+
+    elements.map { e =>
+      (e.getTs(),
+        e.getOperation(),
+        "e",
+        e.srcVertex.innerIdVal.toString,
+        e.tgtVertex.innerIdVal.toString,
+        e.label(),
+        "{}",
+        e.getDirection())
+    }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction")
+  }
+
+  test("S2graphSink writeBatchWithBulkload") {
+    val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
+    val df = toDataFrame(Seq(bulkEdgeString))
+    val args = Map("writeMethod" -> "bulk") ++
+      options.toCommand.grouped(2).map { kv =>
+        kv.head -> kv.last
+      }.toMap
+
+    val conf = TaskConf("test", "sql", Seq("input"), args)
+
+    val sink = new S2graphSink("testQuery", conf)
+    sink.write(df)
+
+    val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
+    s2Edges.foreach { edge => println(edge) }
+
+    val filteredEdges = s2Edges.filter{ edge =>
+      edge.srcVertex.innerIdVal.toString == "a" &&
+        edge.tgtVertex.innerIdVal.toString == "b" &&
+        edge.label() == "friends"
+    }
+
+    assert(filteredEdges.size == 1)
+  }
+
+  test("S2graphSink writeBatchWithMutate") {
+    val bulkEdgeString = "1416236400000\tinsert\tedge\tb\tc\tfriends\t{\"since\":1316236400000,\"score\":20}"
+    val df = toDataFrame(Seq(bulkEdgeString))
+    val args = Map("writeMethod" -> "mutate") ++
+      options.toCommand.grouped(2).map { kv => kv.head -> kv.last }.toMap
+
+    val conf = TaskConf("test", "sql", Seq("input"), args)
+
+    val sink = new S2graphSink("testQuery", conf)
+    sink.write(df)
+
+    val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
+    s2Edges.foreach { edge => println(edge) }
+
+    val filteredEdges = s2Edges.filter{ edge =>
+      edge.srcVertex.innerIdVal.toString == "b" &&
+        edge.tgtVertex.innerIdVal.toString == "c" &&
+        edge.getTs() == 1416236400000L &&
+        edge.label() == "friends"
+    }
+
+    assert(filteredEdges.size == 1)
+  }
+
+}