You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/05 05:43:11 UTC
[1/4] incubator-s2graph git commit: add skipError option to skip over
not-serializable data.
Repository: incubator-s2graph
Updated Branches:
refs/heads/master 1799ae456 -> 5a0e4d835
add skipError option to skip over not-serializable data.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/63dd6fa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/63dd6fa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/63dd6fa2
Branch: refs/heads/master
Commit: 63dd6fa23803f1a76a86f8e53c6115c4dd15cbf9
Parents: 3332f6b
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Apr 2 13:57:15 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Apr 2 13:57:15 2018 +0900
----------------------------------------------------------------------
.../apache/s2graph/s2jobs/S2GraphHelper.scala | 28 +++---
.../s2jobs/loader/GraphFileOptions.scala | 14 +--
.../s2graph/s2jobs/loader/HFileGenerator.scala | 1 +
.../s2jobs/loader/HFileMRGenerator.scala | 1 +
.../loader/LocalBulkLoaderTransformer.scala | 61 -------------
.../loader/SparkBulkLoaderTransformer.scala | 76 ----------------
.../serde/LocalBulkLoaderTransformer.scala | 61 +++++++++++++
.../serde/SparkBulkLoaderTransformer.scala | 76 ++++++++++++++++
.../s2jobs/serde/writer/KeyValueWriter.scala | 5 +-
.../s2jobs/loader/GraphFileGeneratorTest.scala | 95 ++++++++++----------
10 files changed, 214 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 3f80e8f..383f39f 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -24,6 +24,7 @@ import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
import org.apache.s2graph.core.storage.SKeyValue
import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
import play.api.libs.json.Json
import scala.concurrent.ExecutionContext
@@ -54,8 +55,8 @@ object S2GraphHelper {
}
}
- private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): Seq[SKeyValue] = {
- val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
+ private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, option: GraphFileOptions): Seq[SKeyValue] = {
+ val relEdges = if (option.autoEdgeCreate) edge.relatedEdges else List(edge)
val snapshotEdgeKeyValues = s2.getStorage(edge.toSnapshotEdge.label).serDe.snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues
val indexEdgeKeyValues = relEdges.flatMap { edge =>
@@ -67,15 +68,20 @@ object S2GraphHelper {
snapshotEdgeKeyValues ++ indexEdgeKeyValues
}
- def toSKeyValues(s2: S2Graph, element: GraphElement, autoEdgeCreate: Boolean = false): Seq[SKeyValue] = {
- if (element.isInstanceOf[S2Edge]) {
- val edge = element.asInstanceOf[S2Edge]
- insertBulkForLoaderAsync(s2, edge, autoEdgeCreate)
- } else if (element.isInstanceOf[S2Vertex]) {
- val vertex = element.asInstanceOf[S2Vertex]
- s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues
- } else {
- Nil
+ def toSKeyValues(s2: S2Graph, element: GraphElement, option: GraphFileOptions): Seq[SKeyValue] = {
+ try {
+ if (element.isInstanceOf[S2Edge]) {
+ val edge = element.asInstanceOf[S2Edge]
+ insertBulkForLoaderAsync(s2, edge, option)
+ } else if (element.isInstanceOf[S2Vertex]) {
+ val vertex = element.asInstanceOf[S2Vertex]
+ s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues
+ } else {
+ Nil
+ }
+ } catch {
+ case e: Exception =>
+ if (option.skipError) Nil else throw e
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
index 3e3ffb9..e855a32 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
@@ -50,12 +50,10 @@ object GraphFileOptions {
c.copy(dbDriver = x)).text("jdbc driver class.")
opt[Int]('h', "maxHFilePerRegionServer").action ( (x, c) =>
- c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per RegionServer."
- )
+ c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per RegionServer.")
opt[Int]('n', "numRegions").action ( (x, c) =>
- c.copy(numRegions = x)).text("total numRegions(pre-split size) on table."
- )
+ c.copy(numRegions = x)).text("total numRegions(pre-split size) on table.")
opt[String]('l', "labelMapping").action( (x, c) =>
c.copy(labelMapping = toLabelMapping(x)) ).text("mapping info to change the label from source (originalLabel:newLabel)")
@@ -67,8 +65,11 @@ object GraphFileOptions {
c.copy(autoEdgeCreate = x)).text("generate reverse edge automatically")
opt[Boolean]('c', "incrementalLoad").action( (x, c) =>
- c.copy(incrementalLoad = x)).text("whether incremental bulkload which append data on existing table or not."
- )
+ c.copy(incrementalLoad = x)).text("whether incremental bulkload which append data on existing table or not.")
+
+ opt[Boolean]('s', "skipError").action ((x, c) =>
+ c.copy(skipError = x)).text("whether skip error row.")
+
opt[String]('m', "method").action( (x, c) =>
c.copy(method = x)).text("run method. currently MR(default)/SPARK supported."
)
@@ -124,6 +125,7 @@ case class GraphFileOptions(input: String = "",
autoEdgeCreate: Boolean = false,
buildDegree: Boolean = false,
incrementalLoad: Boolean = false,
+ skipError: Boolean = false,
compressionAlgorithm: String = "NONE",
method: String = "SPARK") {
def toConfigParams = {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index b4ac51f..8ace94a 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, TableName}
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
+import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer
import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
index 3502bee..fd78718 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, SequenceFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat}
import org.apache.hadoop.mapreduce.{Job, Mapper}
+import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
deleted file mode 100644
index 7d405a6..0000000
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.loader
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.KeyValue
-import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.serde.Transformer
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
-
-import scala.concurrent.ExecutionContext
-
-class LocalBulkLoaderTransformer(val config: Config,
- val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] {
- val s2: S2Graph = S2GraphHelper.initS2Graph(config)
-
- override val reader = new TsvBulkFormatReader
- override val writer = new KeyValueWriter
-
- override def read(input: Seq[String]): Seq[GraphElement] = input.flatMap(reader.read(s2)(_))
-
- override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = elements.map(writer.write(s2)(_))
-
- override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = {
- val degrees = elements.flatMap { element =>
- DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
- }.groupBy(_._1).mapValues(_.map(_._2).sum)
-
- degrees.toSeq.map { case (degreeKey, count) =>
- DegreeKey.toKeyValue(s2, degreeKey, count)
- }
- }
-
- override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = {
- val elements = read(input)
- val kvs = write(elements)
-
- val degrees = if (options.buildDegree) buildDegrees(elements) else Nil
-
- kvs ++ degrees
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
deleted file mode 100644
index cd991e1..0000000
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.loader
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
-import org.apache.s2graph.core.GraphElement
-import org.apache.s2graph.s2jobs.serde.Transformer
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
-import org.apache.spark.rdd.RDD
-
-class SparkBulkLoaderTransformer(val config: Config,
- val options: GraphFileOptions) extends Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
- val reader = new TsvBulkFormatReader
-
- val writer = new KeyValueWriter
-
- override def read(input: RDD[String]): RDD[GraphElement] = input.mapPartitions { iter =>
- val s2 = S2GraphHelper.initS2Graph(config)
-
- iter.flatMap { line =>
- reader.read(s2)(line)
- }
- }
-
- override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter =>
- val s2 = S2GraphHelper.initS2Graph(config)
-
- iter.map(writer.write(s2)(_))
- }
-
- override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = {
- val degrees = elements.mapPartitions { iter =>
- val s2 = S2GraphHelper.initS2Graph(config)
-
- iter.flatMap { element =>
- DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
- }
- }.reduceByKey(_ + _)
-
- degrees.mapPartitions { iter =>
- val s2 = S2GraphHelper.initS2Graph(config)
-
- iter.map { case (degreeKey, count) =>
- DegreeKey.toKeyValue(s2, degreeKey, count)
- }
- }
- }
-
- override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = {
- val elements = read(input)
- val kvs = write(elements)
-
- if (options.buildDegree) kvs ++ buildDegrees(elements)
- kvs
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
new file mode 100644
index 0000000..a185754
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.KeyValue
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+
+import scala.concurrent.ExecutionContext
+
+class LocalBulkLoaderTransformer(val config: Config,
+ val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] {
+ val s2: S2Graph = S2GraphHelper.initS2Graph(config)
+
+ override val reader = new TsvBulkFormatReader
+ override val writer = new KeyValueWriter(options)
+
+ override def read(input: Seq[String]): Seq[GraphElement] = input.flatMap(reader.read(s2)(_))
+
+ override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = elements.map(writer.write(s2)(_))
+
+ override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = {
+ val degrees = elements.flatMap { element =>
+ DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
+ }.groupBy(_._1).mapValues(_.map(_._2).sum)
+
+ degrees.toSeq.map { case (degreeKey, count) =>
+ DegreeKey.toKeyValue(s2, degreeKey, count)
+ }
+ }
+
+ override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = {
+ val elements = read(input)
+ val kvs = write(elements)
+
+ val degrees = if (options.buildDegree) buildDegrees(elements) else Nil
+
+ kvs ++ degrees
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
new file mode 100644
index 0000000..63f4e2c
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
+import org.apache.s2graph.core.GraphElement
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.spark.rdd.RDD
+
+class SparkBulkLoaderTransformer(val config: Config,
+ val options: GraphFileOptions) extends Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
+ val reader = new TsvBulkFormatReader
+
+ val writer = new KeyValueWriter(options)
+
+ override def read(input: RDD[String]): RDD[GraphElement] = input.mapPartitions { iter =>
+ val s2 = S2GraphHelper.initS2Graph(config)
+
+ iter.flatMap { line =>
+ reader.read(s2)(line)
+ }
+ }
+
+ override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter =>
+ val s2 = S2GraphHelper.initS2Graph(config)
+
+ iter.map(writer.write(s2)(_))
+ }
+
+ override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = {
+ val degrees = elements.mapPartitions { iter =>
+ val s2 = S2GraphHelper.initS2Graph(config)
+
+ iter.flatMap { element =>
+ DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
+ }
+ }.reduceByKey(_ + _)
+
+ degrees.mapPartitions { iter =>
+ val s2 = S2GraphHelper.initS2Graph(config)
+
+ iter.map { case (degreeKey, count) =>
+ DegreeKey.toKeyValue(s2, degreeKey, count)
+ }
+ }
+ }
+
+ override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = {
+ val elements = read(input)
+ val kvs = write(elements)
+
+ if (options.buildDegree) kvs ++ buildDegrees(elements)
+ kvs
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
index 02034af..22eee34 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
@@ -22,11 +22,12 @@ package org.apache.s2graph.s2jobs.serde.writer
import org.apache.hadoop.hbase.KeyValue
import org.apache.s2graph.core.{GraphElement, S2Graph}
import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
import org.apache.s2graph.s2jobs.serde.GraphElementWritable
-class KeyValueWriter(autoEdgeCreate: Boolean = false) extends GraphElementWritable[Seq[KeyValue]] {
+class KeyValueWriter(option: GraphFileOptions) extends GraphElementWritable[Seq[KeyValue]] {
override def write(s2: S2Graph)(element: GraphElement): Seq[KeyValue] = {
- S2GraphHelper.toSKeyValues(s2, element, autoEdgeCreate).map { skv =>
+ S2GraphHelper.toSKeyValues(s2, element, option).map { skv =>
new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index 3fbbd88..3bd1a23 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -19,20 +19,18 @@
package org.apache.s2graph.s2jobs.loader
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
-import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.{PostProcess, S2VertexLike}
+import org.apache.s2graph.core.PostProcess
import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue}
import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.s2graph.s2jobs.serde.{LocalBulkLoaderTransformer, SparkBulkLoaderTransformer}
import play.api.libs.json.Json
-import scala.io.Source
-
class GraphFileGeneratorTest extends BaseSparkTest {
- import scala.concurrent.ExecutionContext.Implicits.global
+
import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
+ import scala.concurrent.ExecutionContext.Implicits.global
+
def transformToSKeyValues(transformerMode: String, edges: Seq[String]): List[SKeyValue] = {
transformerMode match {
case "spark" =>
@@ -56,6 +54,7 @@ class GraphFileGeneratorTest extends BaseSparkTest {
}.toList
}
}
+
test("test generateKeyValues edge only. SparkBulkLoaderTransformer") {
val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
/* end of initialize model */
@@ -121,7 +120,7 @@ class GraphFileGeneratorTest extends BaseSparkTest {
println(Json.prettyPrint(jsValue))
}
- bulkVertex shouldBe(vertex)
+ bulkVertex shouldBe (vertex)
}
test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") {
@@ -140,46 +139,46 @@ class GraphFileGeneratorTest extends BaseSparkTest {
println(Json.prettyPrint(jsValue))
}
- bulkVertex shouldBe(vertex)
+ bulkVertex shouldBe (vertex)
}
-// this test case expect options.input already exist with valid bulk load format.
-// test("bulk load and fetch vertex: spark mode") {
-// import scala.collection.JavaConverters._
-// val serviceColumn = initTestVertexSchema(s2)
-//
-// val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-// val input = sc.parallelize(bulkVertexLs)
-//
-// HFileGenerator.generate(sc, s2Config, input, options)
-//
-// val hfileArgs = Array(options.output, options.tableName)
-// val hbaseConfig = HBaseConfiguration.create()
-//
-// val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-//
-// val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-// val json = PostProcess.verticesToJson(s2Vertices)
-//
-// println(Json.prettyPrint(json))
-// }
-
-// this test case expect options.input already exist with valid bulk load format.
-// test("bulk load and fetch vertex: mr mode") {
-// val serviceColumn = initTestVertexSchema(s2)
-//
-// val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-// val input = sc.parallelize(bulkVertexLs)
-//
-// HFileMRGenerator.generate(sc, s2Config, input, options)
-//
-// val hfileArgs = Array(options.output, options.tableName)
-// val hbaseConfig = HBaseConfiguration.create()
-//
-// val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-// val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-// val json = PostProcess.verticesToJson(s2Vertices)
-//
-// println(Json.prettyPrint(json))
-// }
+ // this test case expect options.input already exist with valid bulk load format.
+ // test("bulk load and fetch vertex: spark mode") {
+ // import scala.collection.JavaConverters._
+ // val serviceColumn = initTestVertexSchema(s2)
+ //
+ // val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+ // val input = sc.parallelize(bulkVertexLs)
+ //
+ // HFileGenerator.generate(sc, s2Config, input, options)
+ //
+ // val hfileArgs = Array(options.output, options.tableName)
+ // val hbaseConfig = HBaseConfiguration.create()
+ //
+ // val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+ //
+ // val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+ // val json = PostProcess.verticesToJson(s2Vertices)
+ //
+ // println(Json.prettyPrint(json))
+ // }
+
+ // this test case expect options.input already exist with valid bulk load format.
+ // test("bulk load and fetch vertex: mr mode") {
+ // val serviceColumn = initTestVertexSchema(s2)
+ //
+ // val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+ // val input = sc.parallelize(bulkVertexLs)
+ //
+ // HFileMRGenerator.generate(sc, s2Config, input, options)
+ //
+ // val hfileArgs = Array(options.output, options.tableName)
+ // val hbaseConfig = HBaseConfiguration.create()
+ //
+ // val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+ // val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+ // val json = PostProcess.verticesToJson(s2Vertices)
+ //
+ // println(Json.prettyPrint(json))
+ // }
}
[3/4] incubator-s2graph git commit: Merge branch 'master' into
S2GRAPH-198
Posted by st...@apache.org.
Merge branch 'master' into S2GRAPH-198
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/acf5ccf4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/acf5ccf4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/acf5ccf4
Branch: refs/heads/master
Commit: acf5ccf4b806d8f9699802201b98c9bab4dbff2e
Parents: 1b8a13c 1799ae4
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Apr 5 13:30:17 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Apr 5 13:32:33 2018 +0900
----------------------------------------------------------------------
CHANGES | 1 +
.../org/apache/s2graph/s2jobs/task/Sink.scala | 10 +-
.../s2jobs/dump/GraphFileDumperTest.scala | 97 --------------------
.../apache/s2graph/s2jobs/task/SinkTest.scala | 10 +-
4 files changed, 13 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/acf5ccf4/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
[4/4] incubator-s2graph git commit: [S2GRAPH-198]: Skip json Decode
error and continue
Posted by st...@apache.org.
[S2GRAPH-198]: Skip json Decode error and continue
JIRA:
[S2GRAPH-198] https://issues.apache.org/jira/browse/S2GRAPH-198
Pull Request:
Closes #150
Author
DO YUNG YOON <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/5a0e4d83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/5a0e4d83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/5a0e4d83
Branch: refs/heads/master
Commit: 5a0e4d83575beab279bd98181ff852b5f01dd7c3
Parents: acf5ccf
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Apr 5 14:42:39 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Apr 5 14:42:39 2018 +0900
----------------------------------------------------------------------
CHANGES | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5a0e4d83/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 077f641..775ccfa 100644
--- a/CHANGES
+++ b/CHANGES
@@ -65,6 +65,7 @@ Release Notes - S2Graph - Version 0.2.0
* [S2GRAPH-169] - Separate multiple functionalities on Storage class into multiple Interface.
* [S2GRAPH-170] - Create Interface for S2Edge/S2Vertex/S2Graph.
* [S2GRAPH-182] - Version up spark dependencies.
+ * [S2GRAPH-198] - Skip json Decode error and continue.
** New Feature
* [S2GRAPH-123] - Support different index on out/in direction.
[2/4] incubator-s2graph git commit: Merge upstream master.
Posted by st...@apache.org.
Merge upstream master.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/1b8a13c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/1b8a13c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/1b8a13c3
Branch: refs/heads/master
Commit: 1b8a13c315eb054efa40f25d6dddde85ce2672af
Parents: 63dd6fa
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Apr 5 13:22:13 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Apr 5 13:22:13 2018 +0900
----------------------------------------------------------------------
.../apache/s2graph/s2jobs/S2GraphHelper.scala | 77 ++++++++++---
.../s2jobs/loader/GraphFileOptions.scala | 31 +++++
.../s2graph/s2jobs/loader/HFileGenerator.scala | 23 +++-
.../s2jobs/loader/HFileMRGenerator.scala | 10 +-
.../loader/LocalBulkLoaderTransformer.scala | 51 +++++++++
.../loader/SparkBulkLoaderTransformer.scala | 69 +++++++++++
.../s2jobs/serde/GraphElementWritable.scala | 4 +
.../serde/LocalBulkLoaderTransformer.scala | 61 ----------
.../serde/SparkBulkLoaderTransformer.scala | 76 -------------
.../s2graph/s2jobs/serde/Transformer.scala | 21 +---
.../serde/reader/RowBulkFormatReader.scala | 33 ++++++
.../s2jobs/serde/writer/KeyValueWriter.scala | 24 +++-
.../org/apache/s2graph/s2jobs/task/Sink.scala | 111 +++++++++++++++---
.../org/apache/s2graph/s2jobs/task/Task.scala | 9 ++
.../sql/streaming/S2StreamQueryWriter.scala | 50 +-------
.../apache/s2graph/s2jobs/BaseSparkTest.scala | 15 +--
.../s2graph/s2jobs/S2GraphHelperTest.scala | 13 ++-
.../s2jobs/loader/GraphFileGeneratorTest.scala | 113 ++++++++++++-------
.../apache/s2graph/s2jobs/task/SinkTest.scala | 98 ++++++++++++++++
19 files changed, 591 insertions(+), 298 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 383f39f..6e68d28 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -24,10 +24,12 @@ import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
import org.apache.s2graph.core.storage.SKeyValue
import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
-import play.api.libs.json.Json
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.StructType
+import play.api.libs.json.{JsObject, Json}
import scala.concurrent.ExecutionContext
+import scala.util.Try
object S2GraphHelper {
def initS2Graph(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = {
@@ -55,8 +57,8 @@ object S2GraphHelper {
}
}
- private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, option: GraphFileOptions): Seq[SKeyValue] = {
- val relEdges = if (option.autoEdgeCreate) edge.relatedEdges else List(edge)
+ private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): Seq[SKeyValue] = {
+ val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
val snapshotEdgeKeyValues = s2.getStorage(edge.toSnapshotEdge.label).serDe.snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues
val indexEdgeKeyValues = relEdges.flatMap { edge =>
@@ -68,20 +70,59 @@ object S2GraphHelper {
snapshotEdgeKeyValues ++ indexEdgeKeyValues
}
- def toSKeyValues(s2: S2Graph, element: GraphElement, option: GraphFileOptions): Seq[SKeyValue] = {
- try {
- if (element.isInstanceOf[S2Edge]) {
- val edge = element.asInstanceOf[S2Edge]
- insertBulkForLoaderAsync(s2, edge, option)
- } else if (element.isInstanceOf[S2Vertex]) {
- val vertex = element.asInstanceOf[S2Vertex]
- s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues
- } else {
- Nil
- }
- } catch {
- case e: Exception =>
- if (option.skipError) Nil else throw e
+ def toSKeyValues(s2: S2Graph, element: GraphElement, autoEdgeCreate: Boolean = false): Seq[SKeyValue] = {
+ if (element.isInstanceOf[S2Edge]) {
+ val edge = element.asInstanceOf[S2Edge]
+ insertBulkForLoaderAsync(s2, edge, autoEdgeCreate)
+ } else if (element.isInstanceOf[S2Vertex]) {
+ val vertex = element.asInstanceOf[S2Vertex]
+ s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues
+ } else {
+ Nil
+ }
+ }
+
+ def sparkSqlRowToGraphElement(s2: S2Graph, row: Row, schema: StructType, reservedColumn: Set[String]): Option[GraphElement] = {
+ val timestamp = row.getAs[Long]("timestamp")
+ val operation = Try(row.getAs[String]("operation")).getOrElse("insert")
+ val elem = Try(row.getAs[String]("elem")).getOrElse("e")
+
+ val props: Map[String, Any] = Option(row.getAs[String]("props")) match {
+ case Some(propsStr:String) =>
+ JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject])
+ case None =>
+ schema.fieldNames.flatMap { field =>
+ if (!reservedColumn.contains(field)) {
+ Seq(
+ field -> getRowValAny(row, field)
+ )
+ } else Nil
+ }.toMap
}
+
+ elem match {
+ case "e" | "edge" =>
+ val from = getRowValAny(row, "from")
+ val to = getRowValAny(row, "to")
+ val label = row.getAs[String]("label")
+ val direction = Try(row.getAs[String]("direction")).getOrElse("out")
+ Some(
+ s2.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation)
+ )
+ case "v" | "vertex" =>
+ val id = getRowValAny(row, "id")
+ val serviceName = row.getAs[String]("service")
+ val columnName = row.getAs[String]("column")
+ Some(
+ s2.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation)
+ )
+ case _ =>
+ None
+ }
+ }
+
+ private def getRowValAny(row:Row, fieldName:String):Any = {
+ val idx = row.fieldIndex(fieldName)
+ row.get(idx)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
index e855a32..f62189a 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
@@ -20,6 +20,12 @@
package org.apache.s2graph.s2jobs.loader
object GraphFileOptions {
+ val OptionKeys = Set(
+ "--input", "--tempDir", "--output", "--zkQuorum", "--table", "--dbUrl", "--dbUser", "--dbPassword", "--dbDriver",
+ "--maxHFilePerRegionServer", "--numRegions", "--labelMapping", "--autoEdgeCreate", "--buildDegree",
+ "--skipError", "--incrementalLoad", "--method"
+ )
+
val parser = new scopt.OptionParser[GraphFileOptions]("run") {
opt[String]('i', "input").required().action( (x, c) =>
@@ -92,6 +98,10 @@ object GraphFileOptions {
(inner.head, inner.last)
}).toMap
}
+
+ def toLabelMappingString(labelMapping: Map[String, String]): String =
+ labelMapping.map { case (k, v) => Seq(k, v).mkString(":") }.mkString(",")
+
}
/**
* Option case class for TransferToHFile.
@@ -137,4 +147,25 @@ case class GraphFileOptions(input: String = "",
"db.default.driver" -> dbDriver
)
}
+
+ def toCommand: Array[String] =
+ Array(
+ "--input", input,
+ "--tempDir", tempDir,
+ "--output", output,
+ "--zkQuorum", zkQuorum,
+ "--table", tableName,
+ "--dbUrl", dbUrl,
+ "--dbUser", dbUser,
+ "--dbPassword", dbPassword,
+ "--dbDriver", dbDriver,
+ "--maxHFilePerRegionServer", maxHFilePerRegionServer.toString,
+ "--numRegions", numRegions.toString,
+ "--labelMapping", GraphFileOptions.toLabelMappingString(labelMapping),
+ "--autoEdgeCreate", autoEdgeCreate.toString,
+ "--buildDegree", buildDegree.toString,
+ "--skipError", skipError.toString,
+ "--incrementalLoad", incrementalLoad.toString,
+ "--method", method
+ )
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index 8ace94a..2b230c9 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -24,12 +24,14 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
+import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.regionserver.BloomType
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, TableName}
+import org.apache.hadoop.util.ToolRunner
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
-import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
@@ -112,11 +114,22 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] {
override def generate(sc: SparkContext,
config: Config,
rdd: RDD[String],
- _options: GraphFileOptions): Unit = {
- val transformer = new SparkBulkLoaderTransformer(config, _options)
+ options: GraphFileOptions): Unit = {
+ val transformer = new SparkBulkLoaderTransformer(config, options)
+
+ implicit val reader = new TsvBulkFormatReader
+ implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError)
+
val kvs = transformer.transform(rdd).flatMap(kvs => kvs)
- HFileGenerator.generateHFile(sc, config, kvs, _options)
+ HFileGenerator.generateHFile(sc, config, kvs, options)
+ }
+
+ def loadIncrementalHFiles(options: GraphFileOptions): Int = {
+ /* LoadIncrementHFiles */
+ val hfileArgs = Array(options.output, options.tableName)
+ val hbaseConfig = HBaseConfiguration.create()
+ ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
index fd78718..9a2e81a 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
@@ -19,8 +19,6 @@
package org.apache.s2graph.s2jobs.loader
-import java.util.UUID
-
import com.typesafe.config.Config
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -30,11 +28,11 @@ import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
import org.apache.hadoop.hbase.mapreduce.{GraphHFileOutputFormat, HFileOutputFormat2}
import org.apache.hadoop.hbase.regionserver.BloomType
-import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, SequenceFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat}
import org.apache.hadoop.mapreduce.{Job, Mapper}
-import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
@@ -106,6 +104,10 @@ object HFileMRGenerator extends RawFileGenerator[String, KeyValue] {
input: RDD[String],
options: GraphFileOptions): RDD[KeyValue] = {
val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+
+ implicit val reader = new TsvBulkFormatReader
+ implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError)
+
transformer.transform(input).flatMap(kvs => kvs)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
new file mode 100644
index 0000000..d3ed6bc
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.loader
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer}
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+
+import scala.concurrent.ExecutionContext
+import scala.reflect.ClassTag
+
+class LocalBulkLoaderTransformer(val config: Config,
+ val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[Seq] {
+ val s2: S2Graph = S2GraphHelper.initS2Graph(config)
+
+ override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit writer: GraphElementWritable[T]): Seq[T] = {
+ val degrees = elements.flatMap { element =>
+ DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
+ }.groupBy(_._1).mapValues(_.map(_._2).sum)
+
+ degrees.toSeq.map { case (degreeKey, count) =>
+ writer.writeDegree(s2)(degreeKey, count)
+ }
+ }
+
+ override def transform[S: ClassTag, T: ClassTag](input: Seq[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): Seq[T] = {
+ val elements = input.flatMap(reader.read(s2)(_))
+ val kvs = elements.map(writer.write(s2)(_))
+ val degrees = if (options.buildDegree) buildDegrees[T](elements) else Nil
+
+ kvs ++ degrees
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
new file mode 100644
index 0000000..eec69b9
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.loader
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.GraphElement
+import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer}
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+class SparkBulkLoaderTransformer(val config: Config,
+ val options: GraphFileOptions) extends Transformer[RDD] {
+
+ override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = {
+ val degrees = elements.mapPartitions { iter =>
+ val s2 = S2GraphHelper.initS2Graph(config)
+
+ iter.flatMap { element =>
+ DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
+ }
+ }.reduceByKey(_ + _)
+
+ degrees.mapPartitions { iter =>
+ val s2 = S2GraphHelper.initS2Graph(config)
+
+ iter.map { case (degreeKey, count) =>
+ writer.writeDegree(s2)(degreeKey, count)
+ }
+ }
+ }
+
+ override def transform[S: ClassTag, T: ClassTag](input: RDD[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): RDD[T] = {
+ val elements = input.mapPartitions { iter =>
+ val s2 = S2GraphHelper.initS2Graph(config)
+
+ iter.flatMap { line =>
+ reader.read(s2)(line)
+ }
+ }
+
+ val kvs = elements.mapPartitions { iter =>
+ val s2 = S2GraphHelper.initS2Graph(config)
+
+ iter.map(writer.write(s2)(_))
+ }
+
+ if (options.buildDegree) kvs ++ buildDegrees(elements)
+ else kvs
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
index ae082d8..f71a9e8 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
@@ -20,7 +20,11 @@
package org.apache.s2graph.s2jobs.serde
import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.DegreeKey
trait GraphElementWritable[T] extends Serializable {
+
def write(s2: S2Graph)(element: GraphElement): T
+
+ def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): T
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
deleted file mode 100644
index a185754..0000000
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.serde
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.KeyValue
-import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
-
-import scala.concurrent.ExecutionContext
-
-class LocalBulkLoaderTransformer(val config: Config,
- val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] {
- val s2: S2Graph = S2GraphHelper.initS2Graph(config)
-
- override val reader = new TsvBulkFormatReader
- override val writer = new KeyValueWriter(options)
-
- override def read(input: Seq[String]): Seq[GraphElement] = input.flatMap(reader.read(s2)(_))
-
- override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = elements.map(writer.write(s2)(_))
-
- override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = {
- val degrees = elements.flatMap { element =>
- DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
- }.groupBy(_._1).mapValues(_.map(_._2).sum)
-
- degrees.toSeq.map { case (degreeKey, count) =>
- DegreeKey.toKeyValue(s2, degreeKey, count)
- }
- }
-
- override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = {
- val elements = read(input)
- val kvs = write(elements)
-
- val degrees = if (options.buildDegree) buildDegrees(elements) else Nil
-
- kvs ++ degrees
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
deleted file mode 100644
index 63f4e2c..0000000
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.serde
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
-import org.apache.s2graph.core.GraphElement
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
-import org.apache.spark.rdd.RDD
-
-class SparkBulkLoaderTransformer(val config: Config,
- val options: GraphFileOptions) extends Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
- val reader = new TsvBulkFormatReader
-
- val writer = new KeyValueWriter(options)
-
- override def read(input: RDD[String]): RDD[GraphElement] = input.mapPartitions { iter =>
- val s2 = S2GraphHelper.initS2Graph(config)
-
- iter.flatMap { line =>
- reader.read(s2)(line)
- }
- }
-
- override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter =>
- val s2 = S2GraphHelper.initS2Graph(config)
-
- iter.map(writer.write(s2)(_))
- }
-
- override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = {
- val degrees = elements.mapPartitions { iter =>
- val s2 = S2GraphHelper.initS2Graph(config)
-
- iter.flatMap { element =>
- DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
- }
- }.reduceByKey(_ + _)
-
- degrees.mapPartitions { iter =>
- val s2 = S2GraphHelper.initS2Graph(config)
-
- iter.map { case (degreeKey, count) =>
- DegreeKey.toKeyValue(s2, degreeKey, count)
- }
- }
- }
-
- override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = {
- val elements = read(input)
- val kvs = write(elements)
-
- if (options.buildDegree) kvs ++ buildDegrees(elements)
- kvs
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
index 3902c63..99afa25 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
@@ -21,30 +21,21 @@ package org.apache.s2graph.s2jobs.serde
import com.typesafe.config.Config
import org.apache.s2graph.core.GraphElement
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+
+import scala.reflect.ClassTag
/**
* Define serialize/deserialize.
* Source -> GraphElement
* GraphElement -> Target
*
- * @tparam S : Source class. ex) String, RDF.Statement, ...
- * @tparam T : Target class. ex) KeyValue, Array[Byte], String, ...
* @tparam M : Container type. ex) RDD, Seq, List, ...
*/
-trait Transformer[S, T, M[_]] extends Serializable {
+trait Transformer[M[_]] extends Serializable {
val config: Config
- val options: GraphFileOptions
-
- val reader: GraphElementReadable[S]
-
- val writer: GraphElementWritable[T]
-
- def read(input: M[S]): M[GraphElement]
-
- def write(elements: M[GraphElement]): M[T]
- def buildDegrees(elements: M[GraphElement]): M[T]
+ def buildDegrees[T: ClassTag](elements: M[GraphElement])(implicit writer: GraphElementWritable[T]): M[T]
- def transform(input: M[S]): M[T]
+ def transform[S: ClassTag, T: ClassTag](input: M[S])
+ (implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): M[T]
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
new file mode 100644
index 0000000..2a15011
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde.reader
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.serde.GraphElementReadable
+import org.apache.spark.sql.Row
+
+class RowBulkFormatReader extends GraphElementReadable[Row] {
+ private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction")
+
+ override def read(s2: S2Graph)(row: Row): Option[GraphElement] =
+ S2GraphHelper.sparkSqlRowToGraphElement(s2, row, row.schema, RESERVED_COLUMN)
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
index 22eee34..04c1b1d 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
@@ -21,14 +21,28 @@ package org.apache.s2graph.s2jobs.serde.writer
import org.apache.hadoop.hbase.KeyValue
import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
import org.apache.s2graph.s2jobs.serde.GraphElementWritable
-class KeyValueWriter(option: GraphFileOptions) extends GraphElementWritable[Seq[KeyValue]] {
+class KeyValueWriter(autoEdgeCreate: Boolean = false,
+ skipError: Boolean = false) extends GraphElementWritable[Seq[KeyValue]] {
override def write(s2: S2Graph)(element: GraphElement): Seq[KeyValue] = {
- S2GraphHelper.toSKeyValues(s2, element, option).map { skv =>
- new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value)
+ try {
+ S2GraphHelper.toSKeyValues(s2, element, autoEdgeCreate).map { skv =>
+ new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value)
+ }
+ } catch {
+ case e: Exception =>
+ if (skipError) Nil else throw e
+ }
+ }
+
+ override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): Seq[KeyValue] = {
+ try {
+ DegreeKey.toKeyValue(s2, degreeKey, count)
+ } catch {
+ case e: Exception =>
+ if (skipError) Nil else throw e
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 3d5beb6..ab32fad 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -19,24 +19,34 @@
package org.apache.s2graph.s2jobs.task
+import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
+import org.apache.s2graph.core.Management
+import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
+import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
import org.elasticsearch.spark.sql.EsSparkSQL
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
/**
* Sink
+ *
* @param queryName
* @param conf
*/
-abstract class Sink(queryName:String, override val conf:TaskConf) extends Task {
+abstract class Sink(queryName: String, override val conf: TaskConf) extends Task {
val DEFAULT_CHECKPOINT_LOCATION = s"/tmp/streamingjob/${queryName}/${conf.name}"
val DEFAULT_TRIGGER_INTERVAL = "10 seconds"
- val FORMAT:String
+ val FORMAT: String
- def preprocess(df:DataFrame):DataFrame = df
+ def preprocess(df: DataFrame): DataFrame = df
- def write(inputDF: DataFrame):Unit = {
+ def write(inputDF: DataFrame): Unit = {
val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
if (inputDF.isStreaming) writeStream(df.writeStream)
@@ -50,7 +60,7 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task {
case "update" => OutputMode.Update()
case "complete" => OutputMode.Complete()
case _ => logger.warn(s"${LOG_PREFIX} unsupported output mode. use default output mode 'append'")
- OutputMode.Append()
+ OutputMode.Append()
}
val interval = conf.options.getOrElse("interval", DEFAULT_TRIGGER_INTERVAL)
val checkpointLocation = conf.options.getOrElse("checkpointLocation", DEFAULT_CHECKPOINT_LOCATION)
@@ -88,9 +98,9 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task {
writer.save(outputPath)
}
- protected def repartition(df:DataFrame, defaultParallelism:Int) = {
+ protected def repartition(df: DataFrame, defaultParallelism: Int) = {
conf.options.get("numPartitions").map(n => Integer.parseInt(n)) match {
- case Some(numOfPartitions:Int) =>
+ case Some(numOfPartitions: Int) =>
if (numOfPartitions > defaultParallelism) df.repartition(numOfPartitions)
else df.coalesce(numOfPartitions)
case None => df
@@ -100,14 +110,16 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task {
/**
* KafkaSink
+ *
* @param queryName
* @param conf
*/
-class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class KafkaSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "topic")
+
override val FORMAT: String = "kafka"
- override def preprocess(df:DataFrame):DataFrame = {
+ override def preprocess(df: DataFrame): DataFrame = {
import org.apache.spark.sql.functions._
logger.debug(s"${LOG_PREFIX} schema: ${df.schema}")
@@ -118,7 +130,7 @@ class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
val columns = df.columns
df.select(concat_ws(delimiter, columns.map(c => col(c)): _*).alias("value"))
- case format:String =>
+ case format: String =>
if (format != "json") logger.warn(s"${LOG_PREFIX} unsupported format '$format'. use default json format")
df.selectExpr("to_json(struct(*)) AS value")
}
@@ -130,21 +142,25 @@ class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
/**
* FileSink
+ *
* @param queryName
* @param conf
*/
-class FileSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class FileSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
override def mandatoryOptions: Set[String] = Set("path", "format")
+
override val FORMAT: String = conf.options.getOrElse("format", "parquet")
}
/**
* HiveSink
+ *
* @param queryName
* @param conf
*/
-class HiveSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class HiveSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
override def mandatoryOptions: Set[String] = Set("database", "table")
+
override val FORMAT: String = "hive"
override protected def writeBatchInner(writer: DataFrameWriter[Row]): Unit = {
@@ -161,11 +177,13 @@ class HiveSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
/**
* ESSink
+ *
* @param queryName
* @param conf
*/
-class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class ESSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
override def mandatoryOptions: Set[String] = Set("es.nodes", "path", "es.port")
+
override val FORMAT: String = "es"
override def write(inputDF: DataFrame): Unit = {
@@ -182,14 +200,75 @@ class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
/**
* S2graphSink
+ *
* @param queryName
* @param conf
*/
-class S2graphSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
override def mandatoryOptions: Set[String] = Set()
+
override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
- override protected def writeBatch(writer: DataFrameWriter[Row]): Unit =
- throw new RuntimeException(s"unsupported source type for ${this.getClass.getSimpleName} : ${conf.name}")
+ private def writeBatchBulkload(df: DataFrame, runLoadIncrementalHFiles: Boolean = true): Unit = {
+ val options = TaskConf.toGraphFileOptions(conf)
+ val config = Management.toConfig(options.toConfigParams)
+ val input = df.rdd
+
+ val transformer = new SparkBulkLoaderTransformer(config, options)
+
+ implicit val reader = new RowBulkFormatReader
+ implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError)
+
+ val kvs = transformer.transform(input)
+
+ HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options)
+
+ // finish bulk load by execute LoadIncrementHFile.
+ if (runLoadIncrementalHFiles) HFileGenerator.loadIncrementalHFiles(options)
+ }
+
+ private def writeBatchWithMutate(df:DataFrame):Unit = {
+ import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+ import scala.collection.JavaConversions._
+
+ val graphConfig: Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
+ val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise())
+
+ val reader = new RowBulkFormatReader
+
+ val groupedSize = getConfigString(graphConfig, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt
+ val waitTime = getConfigString(graphConfig, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt
+
+ df.foreachPartition { iters =>
+ val config = ConfigFactory.parseString(serializedConfig)
+ val s2Graph = S2GraphHelper.initS2Graph(config)
+
+ val responses = iters.grouped(groupedSize).flatMap { rows =>
+ val elements = rows.flatMap(row => reader.read(s2Graph)(row))
+
+ val mutateF = s2Graph.mutateElements(elements, true)
+ Await.result(mutateF, Duration(waitTime, "seconds"))
+ }
+
+ val (success, fail) = responses.toSeq.partition(r => r.isSuccess)
+ logger.info(s"success : ${success.size}, fail : ${fail.size}")
+ }
+ }
+
+ override def write(inputDF: DataFrame): Unit = {
+ val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
+
+ if (inputDF.isStreaming) writeStream(df.writeStream)
+ else {
+ conf.options.getOrElse("writeMethod", "mutate") match {
+ case "mutate" => writeBatchWithMutate(df)
+ case "bulk" =>
+ val runLoadIncrementalHFiles = conf.options.getOrElse("runLoadIncrementalHFiles", "true").toBoolean
+ writeBatchBulkload(df, runLoadIncrementalHFiles)
+ case writeMethod:String => throw new IllegalArgumentException(s"unsupported write method '$writeMethod' (valid method: mutate, bulk)")
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index e8f11e3..ddd56bf 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -20,7 +20,16 @@
package org.apache.s2graph.s2jobs.task
import org.apache.s2graph.s2jobs.Logger
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+object TaskConf {
+ def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
+ val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys)
+ .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
+
+ GraphFileOptions.toOption(args)
+ }
+}
case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty)
trait Task extends Serializable with Logger {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
index e15efe8..f6fecd7 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.spark.sql.streaming
import com.typesafe.config.ConfigFactory
import org.apache.s2graph.core.{GraphElement, JSONParser}
+import org.apache.s2graph.s2jobs.S2GraphHelper
import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
import org.apache.spark.TaskContext
import org.apache.spark.sql.Row
@@ -83,51 +84,6 @@ private [sql] class S2StreamQueryWriter(
}
}
- private def rowToEdge(internalRow:InternalRow): Option[GraphElement] = {
- val s2Graph = s2SinkContext.getGraph
- val row = encoder.fromRow(internalRow)
-
- val timestamp = row.getAs[Long]("timestamp")
- val operation = Try(row.getAs[String]("operation")).getOrElse("insert")
- val elem = Try(row.getAs[String]("elem")).getOrElse("e")
-
- val props: Map[String, Any] = Option(row.getAs[String]("props")) match {
- case Some(propsStr:String) =>
- JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject])
- case None =>
- schema.fieldNames.flatMap { field =>
- if (!RESERVED_COLUMN.contains(field)) {
- Seq(
- field -> getRowValAny(row, field)
- )
- } else Nil
- }.toMap
- }
-
- elem match {
- case "e" | "edge" =>
- val from = getRowValAny(row, "from")
- val to = getRowValAny(row, "to")
- val label = row.getAs[String]("label")
- val direction = Try(row.getAs[String]("direction")).getOrElse("out")
- Some(
- s2Graph.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation)
- )
- case "v" | "vertex" =>
- val id = getRowValAny(row, "id")
- val serviceName = row.getAs[String]("service")
- val columnName = row.getAs[String]("column")
- Some(
- s2Graph.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation)
- )
- case _ =>
- logger.warn(s"'$elem' is not GraphElement. skipped!! (${row.toString()})")
- None
- }
- }
-
- private def getRowValAny(row:Row, fieldName:String):Any = {
- val idx = row.fieldIndex(fieldName)
- row.get(idx)
- }
+ private def rowToEdge(internalRow:InternalRow): Option[GraphElement] =
+ S2GraphHelper.sparkSqlRowToGraphElement(s2SinkContext.getGraph, encoder.fromRow(internalRow), schema, RESERVED_COLUMN)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 78000d4..0461d1e 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.s2jobs
import java.io.{File, PrintWriter}
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
import org.apache.s2graph.core.{Management, S2Graph}
@@ -31,11 +32,8 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import scala.util.Try
-class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll {
- private val master = "local[2]"
- private val appName = "example-spark"
+class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase {
- protected var sc: SparkContext = _
protected val options = GraphFileOptions(
input = "/tmp/test.txt",
tempDir = "/tmp/bulkload_tmp",
@@ -65,18 +63,15 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll {
override def beforeAll(): Unit = {
// initialize spark context.
- val conf = new SparkConf()
- .setMaster(master)
- .setAppName(appName)
-
- sc = new SparkContext(conf)
+ super.beforeAll()
s2 = S2GraphHelper.initS2Graph(s2Config)
initTestDataFile
}
override def afterAll(): Unit = {
- if (sc != null) sc.stop()
+ super.afterAll()
+
if (s2 != null) s2.shutdown()
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
index 6e89466..f2b0102 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
@@ -19,6 +19,17 @@
package org.apache.s2graph.s2jobs
-class S2GraphHelperTest {
+import org.apache.s2graph.s2jobs.task.TaskConf
+class S2GraphHelperTest extends BaseSparkTest {
+ test("toGraphFileOptions") {
+ val args = options.toCommand.grouped(2).map { kv =>
+ kv.head -> kv.last
+ }.toMap ++ Map("db.default.url" -> "jdbc://localhost:3306/mysql")
+
+ println(args)
+ val taskConf = TaskConf("dummy", "sink", Nil, args)
+ val graphFileOptions = S2GraphHelper.toGraphFileOptions(taskConf)
+ println(graphFileOptions)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index 3bd1a23..991897b 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -19,12 +19,16 @@
package org.apache.s2graph.s2jobs.loader
-import org.apache.s2graph.core.PostProcess
+import org.apache.s2graph.core.{PostProcess, S2VertexLike}
import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue}
import org.apache.s2graph.s2jobs.BaseSparkTest
-import org.apache.s2graph.s2jobs.serde.{LocalBulkLoaderTransformer, SparkBulkLoaderTransformer}
+import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, TsvBulkFormatReader}
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.spark.rdd.RDD
import play.api.libs.json.Json
+import scala.io.Source
+
class GraphFileGeneratorTest extends BaseSparkTest {
import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
@@ -34,8 +38,12 @@ class GraphFileGeneratorTest extends BaseSparkTest {
def transformToSKeyValues(transformerMode: String, edges: Seq[String]): List[SKeyValue] = {
transformerMode match {
case "spark" =>
- val input = sc.parallelize(edges)
+ val input: RDD[String] = sc.parallelize(edges)
val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+
+ implicit val reader = new TsvBulkFormatReader
+ implicit val writer = new KeyValueWriter
+
val kvs = transformer.transform(input)
kvs.flatMap { kvs =>
kvs.map { kv =>
@@ -46,12 +54,43 @@ class GraphFileGeneratorTest extends BaseSparkTest {
case "local" =>
val input = edges
val transformer = new LocalBulkLoaderTransformer(s2Config, options)
+
+ implicit val reader = new TsvBulkFormatReader
+ implicit val writer = new KeyValueWriter
+
val kvs = transformer.transform(input)
kvs.flatMap { kvs =>
kvs.map { kv =>
CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)
}
}.toList
+
+ case "dataset" =>
+ import spark.sqlContext.implicits._
+ val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
+
+ val rows = elements.map { e =>
+ (e.getTs(),
+ e.getOperation(),
+ "e",
+ e.srcVertex.innerIdVal.toString,
+ e.tgtVertex.innerIdVal.toString,
+ e.label(),
+ "{}",
+ e.getDirection())
+ }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction").rdd
+
+ val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+
+ implicit val reader = new RowBulkFormatReader
+ implicit val writer = new KeyValueWriter
+
+ val kvs = transformer.transform(rows)
+ kvs.flatMap { kvs =>
+ kvs.map { kv =>
+ CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)
+ }
+ }.collect().toList
}
}
@@ -61,7 +100,7 @@ class GraphFileGeneratorTest extends BaseSparkTest {
val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
- val transformerMode = "spark"
+ val transformerMode = "dataset"
val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
val serDe = s2.defaultStorage.serDe
@@ -143,42 +182,36 @@ class GraphFileGeneratorTest extends BaseSparkTest {
}
// this test case expect options.input already exist with valid bulk load format.
- // test("bulk load and fetch vertex: spark mode") {
- // import scala.collection.JavaConverters._
- // val serviceColumn = initTestVertexSchema(s2)
- //
- // val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
- // val input = sc.parallelize(bulkVertexLs)
- //
- // HFileGenerator.generate(sc, s2Config, input, options)
- //
- // val hfileArgs = Array(options.output, options.tableName)
- // val hbaseConfig = HBaseConfiguration.create()
- //
- // val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
- //
- // val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
- // val json = PostProcess.verticesToJson(s2Vertices)
- //
- // println(Json.prettyPrint(json))
- // }
+ test("bulk load and fetch vertex: spark mode") {
+ import scala.collection.JavaConverters._
+ val serviceColumn = initTestVertexSchema(s2)
+
+ val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+ val input = sc.parallelize(bulkVertexLs)
+
+ HFileGenerator.generate(sc, s2Config, input, options)
+ HFileGenerator.loadIncrementHFile(options)
+
+ val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+ val json = PostProcess.verticesToJson(s2Vertices)
+
+ println(Json.prettyPrint(json))
+ }
// this test case expect options.input already exist with valid bulk load format.
- // test("bulk load and fetch vertex: mr mode") {
- // val serviceColumn = initTestVertexSchema(s2)
- //
- // val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
- // val input = sc.parallelize(bulkVertexLs)
- //
- // HFileMRGenerator.generate(sc, s2Config, input, options)
- //
- // val hfileArgs = Array(options.output, options.tableName)
- // val hbaseConfig = HBaseConfiguration.create()
- //
- // val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
- // val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
- // val json = PostProcess.verticesToJson(s2Vertices)
- //
- // println(Json.prettyPrint(json))
- // }
+// test("bulk load and fetch vertex: mr mode") {
+// import scala.collection.JavaConverters._
+// val serviceColumn = initTestVertexSchema(s2)
+//
+// val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+// val input = sc.parallelize(bulkVertexLs)
+//
+// HFileMRGenerator.generate(sc, s2Config, input, options)
+// HFileGenerator.loadIncrementHFile(options)
+//
+// val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+// val json = PostProcess.verticesToJson(s2Vertices)
+//
+// println(Json.prettyPrint(json))
+// }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1b8a13c3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
new file mode 100644
index 0000000..47d4628
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.task
+
+import org.apache.s2graph.core.S2EdgeLike
+import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.spark.sql.DataFrame
+
+import scala.collection.JavaConverters._
+
+class SinkTest extends BaseSparkTest {
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
+ }
+ def toDataFrame(edges: Seq[String]): DataFrame = {
+ import spark.sqlContext.implicits._
+ val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
+
+ elements.map { e =>
+ (e.getTs(),
+ e.getOperation(),
+ "e",
+ e.srcVertex.innerIdVal.toString,
+ e.tgtVertex.innerIdVal.toString,
+ e.label(),
+ "{}",
+ e.getDirection())
+ }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction")
+ }
+
+ test("S2graphSink writeBatchWithBulkload") {
+ val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
+ val df = toDataFrame(Seq(bulkEdgeString))
+ val args = Map("writeMethod" -> "bulk") ++
+ options.toCommand.grouped(2).map { kv =>
+ kv.head -> kv.last
+ }.toMap
+
+ val conf = TaskConf("test", "sql", Seq("input"), args)
+
+ val sink = new S2graphSink("testQuery", conf)
+ sink.write(df)
+
+ val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
+ s2Edges.foreach { edge => println(edge) }
+
+ val filteredEdges = s2Edges.filter{ edge =>
+ edge.srcVertex.innerIdVal.toString == "a" &&
+ edge.tgtVertex.innerIdVal.toString == "b" &&
+ edge.label() == "friends"
+ }
+
+ assert(filteredEdges.size == 1)
+ }
+
+ test("S2graphSink writeBatchWithMutate") {
+ val bulkEdgeString = "1416236400000\tinsert\tedge\tb\tc\tfriends\t{\"since\":1316236400000,\"score\":20}"
+ val df = toDataFrame(Seq(bulkEdgeString))
+ val args = Map("writeMethod" -> "mutate") ++
+ options.toCommand.grouped(2).map { kv => kv.head -> kv.last }.toMap
+
+ val conf = TaskConf("test", "sql", Seq("input"), args)
+
+ val sink = new S2graphSink("testQuery", conf)
+ sink.write(df)
+
+ val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
+ s2Edges.foreach { edge => println(edge) }
+
+ val filteredEdges = s2Edges.filter{ edge =>
+ edge.srcVertex.innerIdVal.toString == "b" &&
+ edge.tgtVertex.innerIdVal.toString == "c" &&
+ edge.getTs() == 1416236400000L &&
+ edge.label() == "friends"
+ }
+
+ assert(filteredEdges.size == 1)
+ }
+
+}