You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/23 04:36:43 UTC

[5/9] incubator-s2graph git commit: merge S2EdgeDataFrameWriter and S2VertexDataFrameWriter.

merge S2EdgeDataFrameWriter and S2VertexDataFrameWriter.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/5a862aa5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/5a862aa5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/5a862aa5

Branch: refs/heads/master
Commit: 5a862aa56c7f531906e1fd2f7480d8db53f72d23
Parents: 31b5192
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Apr 5 20:12:33 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri Apr 6 13:59:17 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/PostProcess.scala   |  22 +++
 .../org/apache/s2graph/core/S2EdgeLike.scala    |  10 +-
 .../org/apache/s2graph/core/S2VertexLike.scala  |   5 +-
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   |  18 +++
 .../org/apache/s2graph/s2jobs/Schema.scala      |  21 +++
 .../s2graph/s2jobs/loader/HFileGenerator.scala  |  12 +-
 .../serde/writer/RowDataFrameWriter.scala       |  17 +++
 .../serde/writer/S2EdgeDataFrameWriter.scala    |  50 -------
 .../serde/writer/S2VertexDataFrameWriter.scala  |  51 -------
 .../org/apache/s2graph/s2jobs/task/Source.scala |  30 ++--
 .../apache/s2graph/s2jobs/task/SourceTest.scala | 137 +++++++------------
 11 files changed, 142 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5a862aa5/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 462c1e4..8e4be5b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -281,4 +281,26 @@ object PostProcess {
       withOptionalFields(queryOption, results.size, degrees, results, stepResult.failCount, cursorJson, nextQuery)
     }
   }
+
+  def s2EdgePropsJsonString(edge: S2EdgeLike): String =
+    Json.toJson(s2EdgePropsJson(edge)).toString()
+
+  def s2VertexPropsJsonString(vertex: S2VertexLike): String =
+    Json.toJson(s2VertexPropsJson(vertex)).toString()
+
+  def s2EdgePropsJson(edge: S2EdgeLike): Map[String, JsValue] = {
+    import scala.collection.JavaConverters._
+    for {
+      (k, v) <- edge.getPropsWithTs().asScala.toMap
+      jsValue <- JSONParser.anyValToJsValue(v.innerVal.value)
+    } yield (v.labelMeta.name -> jsValue)
+  }
+
+  def s2VertexPropsJson(vertex: S2VertexLike): Map[String, JsValue] = {
+    import scala.collection.JavaConverters._
+    for {
+      (k, v) <- vertex.props.asScala.toMap
+      jsValue <- JSONParser.anyValToJsValue(v.innerVal.value)
+    } yield (v.columnMeta.name -> jsValue)
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5a862aa5/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
index f581e52..2321ac8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
@@ -21,17 +21,14 @@ package org.apache.s2graph.core
 import java.util
 import java.util.function.BiConsumer
 
-import org.apache.s2graph.core.JSONParser.innerValToJsValue
 import org.apache.s2graph.core.S2Edge.{Props, State}
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.utils.logger
 import org.apache.tinkerpop.gremlin.structure
 import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph, Property, T, Vertex}
 import play.api.libs.json.Json
 
 import scala.concurrent.Await
-import scala.collection.JavaConverters._
 
 trait S2EdgeLike extends Edge with GraphElement {
   val innerGraph: S2GraphLike
@@ -250,10 +247,7 @@ trait S2EdgeLike extends Edge with GraphElement {
 
   def toLogString: String = {
     //    val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
-    val propsWithName = for {
-      (k, v) <- propsWithTs.asScala.toMap
-      jsValue <- JSONParser.anyValToJsValue(v.innerVal.value)
-    } yield (v.labelMeta.name -> jsValue)
+    val propsWithName = PostProcess.s2EdgePropsJsonString(this)
 
     List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, Json.toJson(propsWithName)).mkString("\t")
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5a862aa5/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
index eb01da3..4608ce7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
@@ -60,10 +60,7 @@ trait S2VertexLike extends Vertex with GraphElement {
   def defaultProps: util.HashMap[String, S2VertexProperty[_]] = builder.defaultProps
 
   def toLogString(): String = {
-    val propsWithName = for {
-      (k, v) <- props.asScala.toMap
-      jsValue <- JSONParser.anyValToJsValue(v.innerVal.value)
-    } yield (v.columnMeta.name -> jsValue)
+    val propsWithName = PostProcess.s2VertexPropsJsonString(this)
 
     val (serviceName, columnName) =
       if (!id.storeColId) ("", "")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5a862aa5/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 845b343..ade62fa 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -59,6 +59,24 @@ object S2GraphHelper {
     }
   }
 
+  def graphElementToSparkSqlRow(s2: S2Graph, element: GraphElement): Row = {
+    element match {
+      case e: S2EdgeLike =>
+        Row(
+          e.getTs(), e.getOperation(), "edge",
+          e.srcVertex.innerId.toIdString(), e.tgtVertex.innerId.toIdString(), e.label(),
+          PostProcess.s2EdgePropsJsonString(e),
+          e.getDirection()
+        )
+      case v: S2VertexLike =>
+        Row(
+          v.ts, GraphUtil.fromOp(v.op), "vertex",
+          v.innerId.toIdString(), v.serviceName, v.columnName,
+          PostProcess.s2VertexPropsJsonString(v)
+        )
+      case _ => throw new IllegalArgumentException(s"$element is not supported.")
+    }
+  }
   def sparkSqlRowToGraphElement(s2: S2Graph, row: Row, schema: StructType, reservedColumn: Set[String]): Option[GraphElement] = {
     val timestamp = row.getAs[Long]("timestamp")
     val operation = Try(row.getAs[String]("operation")).getOrElse("insert")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5a862aa5/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
index 7c9c393..58d3368 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
@@ -32,4 +32,25 @@ object Schema {
     StructField("props", StringType, false),
     StructField("direction", StringType, true)
   ))
+
+  val VertexSchema = StructType(Seq(
+    StructField("timestamp", LongType, false),
+    StructField("operation", StringType, false),
+    StructField("elem", StringType, false),
+    StructField("id", StringType, false),
+    StructField("service", StringType, false),
+    StructField("column", StringType, false),
+    StructField("props", StringType, false)
+  ))
+
+  val EdgeSchema = StructType(Seq(
+    StructField("timestamp", LongType, false),
+    StructField("operation", StringType, false),
+    StructField("elem", StringType, false),
+    StructField("from", StringType, false),
+    StructField("to", StringType, false),
+    StructField("label", StringType, false),
+    StructField("props", StringType, false),
+    StructField("direction", StringType, true)
+  ))
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5a862aa5/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index a0c14e0..ae9b3a7 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -22,26 +22,24 @@ package org.apache.s2graph.s2jobs.loader
 import com.typesafe.config.Config
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hbase._
 import org.apache.hadoop.hbase.client.{ConnectionFactory, Result, Scan}
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
 import org.apache.hadoop.hbase.mapreduce._
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil
 import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.{Base64, Bytes}
-import org.apache.hadoop.hbase._
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.GraphElement
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
-import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.s2graph.s2jobs.serde.reader.{S2GraphCellReader, TsvBulkFormatReader}
-import org.apache.s2graph.s2jobs.serde.writer.{S2EdgeDataFrameWriter, IdentityWriter, KeyValueWriter}
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.SparkSession
 
 object HFileGenerator extends RawFileGenerator[String, KeyValue] {
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5a862aa5/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala
new file mode 100644
index 0000000..7d2b981
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala
@@ -0,0 +1,17 @@
+package org.apache.s2graph.s2jobs.serde.writer
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.s2graph.s2jobs.serde.GraphElementWritable
+import org.apache.spark.sql.Row
+
+class RowDataFrameWriter extends GraphElementWritable[Row]{
+  override def write(s2: S2Graph)(element: GraphElement): Row = {
+    S2GraphHelper.graphElementToSparkSqlRow(s2, element)
+  }
+
+  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): Row = {
+    val element = DegreeKey.toEdge(s2, degreeKey, count)
+    S2GraphHelper.graphElementToSparkSqlRow(s2, element)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5a862aa5/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2EdgeDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2EdgeDataFrameWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2EdgeDataFrameWriter.scala
deleted file mode 100644
index c2c305c..0000000
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2EdgeDataFrameWriter.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.serde.writer
-
-import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.DegreeKey
-import org.apache.s2graph.s2jobs.serde.GraphElementWritable
-import org.apache.s2graph.s2jobs.serde.writer.S2EdgeDataFrameWriter.S2EdgeTuple
-
-object S2EdgeDataFrameWriter {
-  type S2EdgeTuple = (Long, String, String, String, String, String, String, String)
-  val Fields = Seq("timestamp", "operation", "elem", "from", "to", "label", "props", "direction")
-}
-
-class S2EdgeDataFrameWriter extends GraphElementWritable[S2EdgeTuple] {
-  import S2EdgeDataFrameWriter._
-  private def toGraphElementTuple(tokens: Array[String]): S2EdgeTuple = {
-    tokens match {
-      case Array(ts, op, elem, from, to, label, props, dir) => (ts.toLong, op, elem, from, to, label, props, dir)
-      case Array(ts, op, elem, from, to, label, props) => (ts.toLong, op, elem, from, to, label, props, "out")
-      case _ => throw new IllegalStateException(s"${tokens.toList} is malformed.")
-    }
-  }
-  override def write(s2: S2Graph)(element: GraphElement): S2EdgeTuple = {
-    toGraphElementTuple(element.toLogString().split("\t"))
-  }
-
-  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): S2EdgeTuple = {
-    val element = DegreeKey.toEdge(s2, degreeKey, count)
-
-    toGraphElementTuple(element.toLogString().split("\t"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5a862aa5/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2VertexDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2VertexDataFrameWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2VertexDataFrameWriter.scala
deleted file mode 100644
index c37f78e..0000000
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2VertexDataFrameWriter.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.serde.writer
-
-import org.apache.s2graph.core.{GraphElement, S2Graph, S2VertexLike}
-import org.apache.s2graph.s2jobs.DegreeKey
-import org.apache.s2graph.s2jobs.serde.GraphElementWritable
-import org.apache.s2graph.s2jobs.serde.writer.S2VertexDataFrameWriter.S2VertexTuple
-
-object S2VertexDataFrameWriter {
-  type S2VertexTuple = (Long, String, String, String, String, String, String)
-  val EmptyS2VertexTuple = (0L, "", "", "", "", "", "")
-  val Fields = Seq("timestamp", "operation", "elem", "id", "service", "column", "props")
-}
-
-class S2VertexDataFrameWriter extends GraphElementWritable[S2VertexTuple] {
-  import S2VertexDataFrameWriter._
-  private def toVertexTuple(tokens: Array[String]): S2VertexTuple = {
-    tokens match {
-      case Array(ts, op, elem, id, service, column, props) => (ts.toLong, op, elem, id, service, column, props)
-      case _ => throw new IllegalStateException(s"${tokens.toList} is malformed.")
-    }
-  }
-  override def write(s2: S2Graph)(element: GraphElement): S2VertexTuple = {
-    element match {
-      case v: S2VertexLike => toVertexTuple(v.toLogString().split("\t"))
-      case _ => throw new IllegalArgumentException(s"Vertex expected, $element is not vertex.")
-    }
-
-  }
-
-  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): S2VertexTuple =
-    EmptyS2VertexTuple
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5a862aa5/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index 06a28c8..dc5c054 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -19,11 +19,12 @@
 
 package org.apache.s2graph.s2jobs.task
 
-import org.apache.s2graph.core.{GraphElement, Management}
-import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer}
+import org.apache.s2graph.core.Management
+import org.apache.s2graph.s2jobs.Schema
+import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
 import org.apache.s2graph.s2jobs.serde.reader.S2GraphCellReader
-import org.apache.s2graph.s2jobs.serde.writer.{S2EdgeDataFrameWriter, S2VertexDataFrameWriter}
-import org.apache.spark.sql.{DataFrame, DataFrameWriter, SparkSession}
+import org.apache.s2graph.s2jobs.serde.writer.RowDataFrameWriter
+import org.apache.spark.sql.{DataFrame, SparkSession}
 
 
 /**
@@ -112,7 +113,6 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) {
   override def mandatoryOptions: Set[String] = Set("hbase.rootdir", "restore.path", "hbase.table.names")
 
   override def toDF(ss: SparkSession): DataFrame = {
-    import ss.sqlContext.implicits._
     val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf)
     val config = Management.toConfig(mergedConf)
 
@@ -132,21 +132,11 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) {
 
 
     implicit val reader = new S2GraphCellReader(elementType)
+    implicit val writer = new RowDataFrameWriter
+    val transformer = new SparkBulkLoaderTransformer(config, labelMapping, buildDegree)
+    val kvs = transformer.transform(cells)
 
-    columnFamily match {
-      case "v" =>
-        implicit val writer = new S2VertexDataFrameWriter
-        val transformer = new SparkBulkLoaderTransformer(config, labelMapping, buildDegree)
-        val kvs = transformer.transform(cells)
-
-        kvs.toDF(S2VertexDataFrameWriter.Fields: _*)
-      case "e" =>
-        implicit val writer = new S2EdgeDataFrameWriter
-        val transformer = new SparkBulkLoaderTransformer(config, labelMapping, buildDegree)
-        val kvs = transformer.transform(cells)
-
-        kvs.toDF(S2EdgeDataFrameWriter.Fields: _*)
-      case _ => throw new IllegalArgumentException(s"$columnFamily is not supported.")
-    }
+    val schema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema
+    ss.sqlContext.createDataFrame(kvs, schema)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5a862aa5/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
index 9cd52eb..9b9a016 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
@@ -20,58 +20,34 @@
 package org.apache.s2graph.s2jobs.task
 
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
-import org.apache.s2graph.core.{GraphUtil, S2EdgeLike, S2VertexLike}
-import org.apache.s2graph.core.storage.hbase.{AsynchbaseStorage, AsynchbaseStorageManagement}
+import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
+import org.apache.s2graph.core.{GraphElement, S2EdgeLike, S2VertexLike}
 import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.{S2EdgeDataFrameWriter, S2VertexDataFrameWriter}
-import org.apache.s2graph.s2jobs.{BaseSparkTest, S2GraphHelper}
+import org.apache.s2graph.s2jobs.{BaseSparkTest, S2GraphHelper, Schema}
 import org.apache.spark.sql.DataFrame
-
-import scala.collection.JavaConverters._
+import org.apache.spark.sql.types.StructType
 
 class SourceTest extends BaseSparkTest {
   //TODO: props to valid string.
-  def s2VertexToDataFrame(vertices: Seq[String]): DataFrame = {
-    import spark.sqlContext.implicits._
-    val elements = vertices.flatMap(s2.elementBuilder.toVertex(_))
-
-    elements.map { v =>
-      (v.ts, GraphUtil.fromOp(v.op),
-        "v", v.innerId.toIdString(), v.serviceName, v.columnName, "{}")
-    }.toDF(S2VertexDataFrameWriter.Fields: _*)
-  }
-
-  def s2EdgeToDataFrame(edges: Seq[String]): DataFrame = {
-    import spark.sqlContext.implicits._
-    val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
-
-    elements.map { e =>
-      (e.getTs(),
-        e.getOperation(),
-        "e",
-        e.srcVertex.innerIdVal.toString,
-        e.tgtVertex.innerIdVal.toString,
-        e.label(),
-        "{}",
-        e.getDirection())
-    }.toDF(S2EdgeDataFrameWriter.Fields: _*)
+  def toDataFrame(elements: Seq[String], schema: StructType): DataFrame = {
+    val ls = elements.flatMap(s2.elementBuilder.toGraphElement(_)).map { element =>
+      S2GraphHelper.graphElementToSparkSqlRow(s2, element)
+    }
+    val rdd = spark.sparkContext.parallelize(ls)
+    spark.sqlContext.createDataFrame(rdd, schema)
   }
 
-  test("S2GraphSource edge toDF") {
-    val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
+  def runCheck(data: Seq[String],
+               schema: StructType,
+               columnFamily: String,
+               elementType: String): (Seq[GraphElement], Seq[GraphElement]) = {
     val snapshotTableName = options.tableName + "-snapshot"
 
-    // 1. run S2GraphSink to build(not actually load by using LoadIncrementalLoad) bulk load file.
-    val bulkEdges = Seq(
-      "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}",
-      "1416236400000\tinsert\tedge\ta\tc\tfriends\t{\"since\":1316236400000,\"score\":10}"
-    )
-    val df = s2EdgeToDataFrame(bulkEdges)
+    val df = toDataFrame(data, schema)
 
     val reader = new RowBulkFormatReader
 
-    val inputEdges = df.collect().flatMap(reader.read(s2)(_))
-      .sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString())
+    val input = df.collect().flatMap(reader.read(s2)(_))
 
     val args = options.toCommand.grouped(2).map { kv =>
       kv.head -> kv.last
@@ -99,15 +75,33 @@ class SourceTest extends BaseSparkTest {
       "hbase.rootdir" -> hbaseConfig.get("hbase.rootdir"),
       "restore.path" -> "/tmp/hbase_restore",
       "hbase.table.names" -> s"${snapshotTableName}",
-      "hbase.table.cf" -> "e",
-      "element.type" -> "IndexEdge"
+      "hbase.table.cf" -> columnFamily,
+      "element.type" -> elementType
     ) ++ metaAndHBaseArgs
 
     val dumpConf = TaskConf("dump", "sql", Seq("input"), dumpArgs)
     val source = new S2GraphSource(dumpConf)
     val realDF = source.toDF(spark)
-    val outputEdges = realDF.collect().flatMap(reader.read(s2)(_))
-      .sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString())
+
+    realDF.printSchema()
+
+    val output = realDF.collect().flatMap(reader.read(s2)(_))
+
+    (input, output)
+  }
+
+  test("S2GraphSource edge toDF") {
+    val column = initTestVertexSchema(s2)
+    val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
+
+    val bulkEdges = Seq(
+      s"1416236400000\tinsert\tedge\ta\tb\t${label.label}\t{}",
+      s"1416236400000\tinsert\tedge\ta\tc\t${label.label}\t{}"
+    )
+
+    val (_inputEdges, _outputEdges) = runCheck(bulkEdges, Schema.EdgeSchema, "e", "IndexEdge")
+    val inputEdges = _inputEdges.sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString())
+    val outputEdges = _outputEdges.sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString())
 
     inputEdges.foreach { e => println(s"[Input]: $e")}
     outputEdges.foreach { e => println(s"[Output]: $e")}
@@ -115,61 +109,22 @@ class SourceTest extends BaseSparkTest {
     inputEdges shouldBe outputEdges
   }
 
-  test("S2GraphSource vertex toDF") {
+  ignore("S2GraphSource vertex toDF") {
     val column = initTestVertexSchema(s2)
-    val snapshotTableName = options.tableName + "-snapshot"
+    val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
 
     val bulkVertices = Seq(
       s"1416236400000\tinsert\tvertex\tc\t${column.service.serviceName}\t${column.columnName}\t{}",
       s"1416236400000\tinsert\tvertex\td\t${column.service.serviceName}\t${column.columnName}\t{}"
     )
-    val df = s2VertexToDataFrame(bulkVertices)
 
-    val reader = new RowBulkFormatReader
-
-    val input = df.collect().flatMap(reader.read(s2)(_))
-      .sortBy(_.asInstanceOf[S2VertexLike].innerId.toIdString())
-
-    val args = options.toCommand.grouped(2).map { kv =>
-      kv.head -> kv.last
-    }.toMap ++ Map("writeMethod" -> "bulk", "runLoadIncrementalHFiles" -> "true")
-
-    val conf = TaskConf("test", "sql", Seq("input"), args)
-
-    val sink = new S2GraphSink("testQuery", conf)
-    sink.write(df)
-
-    // 2. create snapshot if snapshot is not exist to test TableSnapshotInputFormat.
-    s2.defaultStorage.management.asInstanceOf[AsynchbaseStorageManagement].withAdmin(s2.config) { admin =>
-      import scala.collection.JavaConverters._
-      if (admin.listSnapshots(snapshotTableName).asScala.toSet(snapshotTableName))
-        admin.deleteSnapshot(snapshotTableName)
-
-      admin.snapshot(snapshotTableName, TableName.valueOf(options.tableName))
-    }
-
-    // 3. Decode S2GraphSource to parse HFile
-    val metaAndHBaseArgs = options.toConfigParams
-    val hbaseConfig = HBaseConfiguration.create(spark.sparkContext.hadoopConfiguration)
-
-    val dumpArgs = Map(
-      "hbase.rootdir" -> hbaseConfig.get("hbase.rootdir"),
-      "restore.path" -> "/tmp/hbase_restore",
-      "hbase.table.names" -> s"${snapshotTableName}",
-      "hbase.table.cf" -> "v",
-      "element.type" -> "Vertex"
-    ) ++ metaAndHBaseArgs
-
-    val dumpConf = TaskConf("dump", "sql", Seq("input"), dumpArgs)
-    val source = new S2GraphSource(dumpConf)
-    val realDF = source.toDF(spark)
-
-    val output = realDF.collect().flatMap(reader.read(s2)(_))
-      .sortBy(_.asInstanceOf[S2VertexLike].innerId.toIdString())
+    val (_inputVertices, _outputVertices) = runCheck(bulkVertices, Schema.VertexSchema, "v", "Vertex")
+    val inputVertices = _inputVertices.sortBy(_.asInstanceOf[S2VertexLike].innerId.toIdString())
+    val outputVertices = _outputVertices.sortBy(_.asInstanceOf[S2VertexLike].innerId.toIdString())
 
-    input.foreach { e => println(s"[Input]: $e")}
-    output.foreach { e => println(s"[Output]: $e")}
+    inputVertices.foreach { v => println(s"[Input]: $v")}
+    outputVertices.foreach { v => println(s"[Output]: $v")}
 
-    input shouldBe output
+    inputVertices shouldBe outputVertices
   }
 }