You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/10/30 10:23:32 UTC
[4/8] incubator-s2graph git commit: Separate interfaces from Storage.
Separate interfaces from Storage.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/39544dc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/39544dc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/39544dc5
Branch: refs/heads/master
Commit: 39544dc5debd4821c4f73db17e88bdbeb9f43b38
Parents: 3361320
Author: DO YUNG YOON <st...@apache.org>
Authored: Sat Oct 28 09:27:51 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Sat Oct 28 09:27:51 2017 +0900
----------------------------------------------------------------------
.gitignore | 2 +-
s2core/build.sbt | 3 +-
.../org/apache/s2graph/core/Management.scala | 74 +-
.../org/apache/s2graph/core/QueryParam.scala | 2 +-
.../scala/org/apache/s2graph/core/S2Edge.scala | 2 +-
.../scala/org/apache/s2graph/core/S2Graph.scala | 275 +++--
.../org/apache/s2graph/core/S2Vertex.scala | 40 +-
.../s2graph/core/storage/Deserializable.scala | 43 -
.../s2graph/core/storage/MutateResponse.scala | 12 +
.../apache/s2graph/core/storage/SKeyValue.scala | 1 +
.../s2graph/core/storage/Serializable.scala | 27 -
.../apache/s2graph/core/storage/Storage.scala | 1146 ++----------------
.../core/storage/StorageDeserializable.scala | 120 --
.../apache/s2graph/core/storage/StorageIO.scala | 241 ++++
.../core/storage/StorageManagement.scala | 35 +
.../s2graph/core/storage/StorageReadable.scala | 62 +
.../s2graph/core/storage/StorageSerDe.scala | 59 +
.../core/storage/StorageSerializable.scala | 82 --
.../s2graph/core/storage/StorageWritable.scala | 45 +
.../storage/WriteWriteConflictResolver.scala | 438 +++++++
.../core/storage/hbase/AsynchbaseStorage.scala | 764 +-----------
.../hbase/AsynchbaseStorageManagement.scala | 263 ++++
.../hbase/AsynchbaseStorageReadable.scala | 335 +++++
.../storage/hbase/AsynchbaseStorageSerDe.scala | 68 ++
.../hbase/AsynchbaseStorageWritable.scala | 118 ++
.../core/storage/serde/Deserializable.scala | 41 +
.../core/storage/serde/Serializable.scala | 27 +
.../storage/serde/StorageDeserializable.scala | 144 +++
.../storage/serde/StorageSerializable.scala | 90 ++
.../tall/IndexEdgeDeserializable.scala | 12 +-
.../indexedge/tall/IndexEdgeSerializable.scala | 5 +-
.../wide/IndexEdgeDeserializable.scala | 4 +-
.../indexedge/wide/IndexEdgeSerializable.scala | 5 +-
.../tall/SnapshotEdgeDeserializable.scala | 7 +-
.../tall/SnapshotEdgeSerializable.scala | 4 +-
.../wide/SnapshotEdgeDeserializable.scala | 9 +-
.../wide/SnapshotEdgeSerializable.scala | 4 +-
.../serde/vertex/VertexDeserializable.scala | 146 +--
.../serde/vertex/VertexSerializable.scala | 114 +-
.../vertex/tall/VertexDeserializable.scala | 58 +
.../serde/vertex/tall/VertexSerializable.scala | 54 +
.../vertex/wide/VertexDeserializable.scala | 73 ++
.../serde/vertex/wide/VertexSerializable.scala | 52 +
.../apache/s2graph/core/utils/DeferCache.scala | 8 +-
.../s2graph/core/Integrate/CrudTest.scala | 8 +-
.../LabelLabelIndexMutateOptionTest.scala | 6 +-
.../s2graph/core/storage/StorageIOTest.scala | 59 +
.../core/storage/hbase/IndexEdgeTest.scala | 3 +-
.../core/storage/rocks/RocksStorageTest.scala | 33 +
.../rest/play/controllers/EdgeController.scala | 11 +-
.../play/controllers/VertexController.scala | 5 +-
51 files changed, 2913 insertions(+), 2326 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 9f295f2..ad5a5e9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -107,4 +107,4 @@ server.pid
.cache
### Local Embedded HBase Data ###
-storage/
+#storage/
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 8033581..e7e602f 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -48,7 +48,8 @@ libraryDependencies ++= Seq(
"org.specs2" %% "specs2-core" % specs2Version % "test",
"org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion ,
"org.apache.lucene" % "lucene-core" % "6.6.0",
- "org.apache.lucene" % "lucene-queryparser" % "6.6.0"
+ "org.apache.lucene" % "lucene-queryparser" % "6.6.0",
+ "org.rocksdb" % "rocksdbjni" % "5.8.0"
)
libraryDependencies := {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index a9741d2..49d3c0e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.core
import java.util
+import com.typesafe.config.{Config, ConfigFactory}
import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNameTooLongException, LabelNotExistException}
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
import org.apache.s2graph.core.mysqls._
@@ -36,9 +37,22 @@ import scala.util.Try
* s2core never use this for finding models.
*/
object Management {
-
+ import HBaseType._
import scala.collection.JavaConversions._
+ val ZookeeperQuorum = "hbase.zookeeper.quorum"
+ val ColumnFamilies = "hbase.table.column.family"
+ val RegionMultiplier = "hbase.table.region.multiplier"
+ val Ttl = "hbase.table.ttl"
+ val CompressionAlgorithm = "hbase.table.compression.algorithm"
+ val ReplicationScope = "hbase.table.replication.scope"
+ val TotalRegionCount = "hbase.table.total.region.count"
+
+ val DefaultColumnFamilies = Seq("e", "v")
+ val DefaultCompressionAlgorithm = "gz"
+ val LABEL_NAME_MAX_LENGTH = 100
+
+
def newProp(name: String, defaultValue: String, datatType: String): Prop = {
new Prop(name, defaultValue, datatType)
}
@@ -56,10 +70,6 @@ object Management {
case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None)
}
- import HBaseType._
-
- val LABEL_NAME_MAX_LENGTH = 100
- val DefaultCompressionAlgorithm = "gz"
def findService(serviceName: String) = {
Service.findByName(serviceName, useCache = false)
@@ -263,6 +273,24 @@ object Management {
Label.updateName(tempLabel, rightLabel)
}
}
+ def toConfig(params: Map[String, Any]): Config = {
+ import scala.collection.JavaConversions._
+
+ val filtered = params.filter { case (k, v) =>
+ v match {
+ case None => false
+ case _ => true
+ }
+ }.map { case (k, v) =>
+ val newV = v match {
+ case Some(value) => value
+ case _ => v
+ }
+ k -> newV
+ }
+
+ ConfigFactory.parseMap(filtered)
+ }
}
class Management(graph: S2Graph) {
@@ -277,7 +305,15 @@ class Management(graph: S2Graph) {
compressionAlgorithm: String = DefaultCompressionAlgorithm,
replicationScopeOpt: Option[Int] = None,
totalRegionCount: Option[Int] = None): Unit = {
- graph.defaultStorage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm, replicationScopeOpt, totalRegionCount)
+ val config = toConfig(Map(
+ ZookeeperQuorum -> zkAddr,
+// ColumnFamilies -> cfs,
+ RegionMultiplier -> regionMultiplier,
+ Ttl -> ttl,
+ CompressionAlgorithm -> compressionAlgorithm,
+ TotalRegionCount -> totalRegionCount
+ ))
+ graph.defaultStorage.createTable(config, tableName)
}
@@ -299,8 +335,15 @@ class Management(graph: S2Graph) {
Model withTx { implicit session =>
val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm, useCache = false)
+ val config = toConfig(Map(
+ ZookeeperQuorum -> service.cluster,
+// ColumnFamilies -> List("e", "v"),
+ RegionMultiplier -> service.preSplitSize,
+ Ttl -> service.hTableTTL,
+ CompressionAlgorithm -> compressionAlgorithm
+ ))
/* create hbase table for service */
- graph.getStorage(service).createTable(service.cluster, service.hTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
+ graph.getStorage(service).createTable(config, service.hTableName)
service
}
}
@@ -390,7 +433,14 @@ class Management(graph: S2Graph) {
/* create hbase table */
val storage = graph.getStorage(newLabel)
val service = newLabel.service
- storage.createTable(service.cluster, newLabel.hbaseTableName, List("e", "v"), service.preSplitSize, newLabel.hTableTTL, newLabel.compressionAlgorithm)
+ val config = toConfig(Map(
+ ZookeeperQuorum -> service.cluster,
+// ColumnFamilies -> List("e", "v"),
+ RegionMultiplier -> service.preSplitSize,
+ Ttl -> newLabel.hTableTTL,
+ CompressionAlgorithm -> newLabel.compressionAlgorithm
+ ))
+ storage.createTable(config, newLabel.hbaseTableName)
newLabel
}
@@ -449,7 +499,9 @@ class Management(graph: S2Graph) {
labelOpt.map { label =>
val storage = graph.getStorage(label)
val zkAddr = label.service.cluster
- storage.truncateTable(zkAddr, label.hbaseTableName)
+
+ val config = toConfig(Map(ZookeeperQuorum -> zkAddr))
+ storage.truncateTable(config, label.hbaseTableName)
}
}
}
@@ -459,7 +511,9 @@ class Management(graph: S2Graph) {
labelOpt.map { label =>
val storage = graph.getStorage(label)
val zkAddr = label.service.cluster
- storage.deleteTable(zkAddr, label.hbaseTableName)
+
+ val config = toConfig(Map(ZookeeperQuorum -> zkAddr))
+ storage.deleteTable(config, label.hbaseTableName)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 7e95f58..1100f6c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -26,7 +26,7 @@ import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
import org.apache.s2graph.core.parsers.{Where, WhereParser}
import org.apache.s2graph.core.rest.TemplateHelper
-import org.apache.s2graph.core.storage.StorageSerializable._
+import org.apache.s2graph.core.storage.serde.StorageSerializable._
import org.apache.s2graph.core.types._
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
import org.hbase.async.ColumnRangeFilter
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 7165579..51af831 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -684,7 +684,7 @@ case class S2Edge(innerGraph: S2Graph,
// should we delete related edges also?
val future = innerGraph.mutateEdges(Seq(edgeToDelete), withWait = true)
val mutateSuccess = Await.result(future, innerGraph.WaitTimeout)
- if (!mutateSuccess.forall(identity)) throw new RuntimeException("edge remove failed.")
+ if (!mutateSuccess.forall(_.isSuccess)) throw new RuntimeException("edge remove failed.")
} else {
throw Edge.Exceptions.edgeRemovalNotSupported()
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index d1eda5e..32724b4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -22,18 +22,17 @@ package org.apache.s2graph.core
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.{Executors, TimeUnit}
-import java.util.function.Consumer
-
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.commons.configuration.{BaseConfiguration, Configuration}
-import org.apache.s2graph.core.GraphExceptions.{FetchTimeoutException, LabelNotExistException}
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.features.S2GraphVariables
-import org.apache.s2graph.core.index.{IndexProvider, LuceneIndexProvider}
+import org.apache.s2graph.core.index.IndexProvider
import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
-import org.apache.s2graph.core.storage.{SKeyValue, Storage}
+import org.apache.s2graph.core.storage.rocks.RocksStorage
+import org.apache.s2graph.core.storage.{ MutateResponse, SKeyValue, Storage}
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
@@ -44,8 +43,6 @@ import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables}
import org.apache.tinkerpop.gremlin.structure.io.{GraphReader, GraphWriter, Io, Mapper}
import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, Transaction, Vertex}
import play.api.libs.json.{JsObject, Json}
-import scalikejdbc.DBSession
-
import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -137,12 +134,13 @@ object S2Graph {
new S2Graph(configuration)(ec)
}
- def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = {
+ def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage[_] = {
val storageBackend = config.getString("s2graph.storage.backend")
logger.info(s"[InitStorage]: $storageBackend")
storageBackend match {
- case "hbase" => new AsynchbaseStorage(graph, config)(ec)
+ case "hbase" => new AsynchbaseStorage(graph, config)
+ case "rocks" => new RocksStorage(graph, config)
case _ => throw new RuntimeException("not supported storage.")
}
}
@@ -912,7 +910,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
/**
* TODO: we need to some way to handle malformed configuration for storage.
*/
- val storagePool: scala.collection.mutable.Map[String, Storage[_, _]] = {
+ val storagePool: scala.collection.mutable.Map[String, Storage[_]] = {
val labels = Label.findAll()
val services = Service.findAll()
@@ -923,12 +921,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
confWithFallback(conf)
}.toSet
- val pools = new java.util.HashMap[Config, Storage[_, _]]()
+ val pools = new java.util.HashMap[Config, Storage[_]]()
configs.foreach { config =>
pools.put(config, S2Graph.initStorage(this, config)(ec))
}
- val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_, _]]()
+ val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_]]()
labels.foreach { label =>
if (label.storageConfigOpt.isDefined) {
@@ -945,7 +943,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
m
}
- val defaultStorage: Storage[_, _] = S2Graph.initStorage(this, config)(ec)
+ val defaultStorage: Storage[_] = S2Graph.initStorage(this, config)(ec)
/** QueryLevel FutureCache */
val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty)
@@ -957,11 +955,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
val indexProvider = IndexProvider.apply(config)
- def getStorage(service: Service): Storage[_, _] = {
+ def getStorage(service: Service): Storage[_] = {
storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
}
- def getStorage(label: Label): Storage[_, _] = {
+ def getStorage(label: Label): Storage[_] = {
storagePool.getOrElse(s"label:${label.label}", defaultStorage)
}
@@ -979,8 +977,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
val futures = for {
edge <- edges
} yield {
- fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
- edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, queryParam.label))
+ getStorage(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (_, edgeOpt, _) =>
+ edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, edge.innerLabel))
}
}
@@ -1147,39 +1145,33 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
fallback
} get
}
+
+ def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
+ def getVertices[Q](storage: Storage[Q])(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
+ def fromResult(kvs: Seq[SKeyValue],
+ version: String): Option[S2Vertex] = {
+ if (kvs.isEmpty) None
+ else storage.vertexDeserializer(version).fromKeyValues(kvs, None)
+ // .map(S2Vertex(graph, _))
+ }
-
- def fetchSnapshotEdge(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = {
- /* TODO: Fix this. currently fetchSnapshotEdge should not use future cache
- * so use empty cacheKey.
- * */
- val queryParam = QueryParam(labelName = edge.innerLabel.label,
- direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
- tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
- cacheTTLInMillis = -1)
- val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))
- val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
-
- val storage = getStorage(edge.innerLabel)
- storage.fetchSnapshotEdgeKeyValues(queryRequest).map { kvs =>
- val (edgeOpt, kvOpt) =
- if (kvs.isEmpty) (None, None)
- else {
- val snapshotEdgeOpt = storage.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
- val _kvOpt = kvs.headOption
- (snapshotEdgeOpt, _kvOpt)
+ val futures = vertices.map { vertex =>
+ val queryParam = QueryParam.Empty
+ val q = Query.toQuery(Seq(vertex), Seq(queryParam))
+ val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
+ val rpc = storage.buildRequest(queryRequest, vertex)
+ storage.fetchKeyValues(rpc).map { kvs =>
+ fromResult(kvs, vertex.serviceColumn.schemaVersion)
+ } recoverWith { case ex: Throwable =>
+ Future.successful(None)
}
- (queryParam, edgeOpt, kvOpt)
- } recoverWith { case ex: Throwable =>
- logger.error(s"fetchQueryParam failed. fallback return.", ex)
- throw FetchTimeoutException(s"${edge.toLogString}")
- }
- }
+ }
- def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
+ Future.sequence(futures).map { result => result.toList.flatten }
+ }
val verticesWithIdx = vertices.zipWithIndex
val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
- getStorage(service).getVertices(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2)))
+ getVertices(getStorage(service))(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2)))
}
Future.sequence(futures).map { ls =>
@@ -1221,8 +1213,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
+ val futures = queries.map(getEdgesStepInner(_, true))
val future = for {
- stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_, true)))
+ stepInnerResultLs <- Future.sequence(futures)
(allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
} yield {
// logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
@@ -1241,6 +1234,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult],
requestTs: Long): Future[(Boolean, Boolean)] = {
stepInnerResultLs.foreach { stepInnerResult =>
+ logger.error(s"[!!!!!!]: ${stepInnerResult.edgeWithScores.size}")
if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
}
val futures = for {
@@ -1257,9 +1251,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
* read: snapshotEdge on queryResult = O(N)
* write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
*/
- mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(identity))
+ mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(_.isSuccess))
} else {
- getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
+ deleteAllFetchedEdgesAsyncOld(getStorage(label))(deleteStepInnerResult, requestTs, MaxRetryNum)
}
case _ =>
@@ -1267,7 +1261,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
* read: x
* write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
*/
- getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
+ deleteAllFetchedEdgesAsyncOld(getStorage(label))(deleteStepInnerResult, requestTs, MaxRetryNum)
}
ret
}
@@ -1280,6 +1274,44 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
}
+ private def deleteAllFetchedEdgesAsyncOld(storage: Storage[_])(stepInnerResult: StepResult,
+ requestTs: Long,
+ retryNum: Int): Future[Boolean] = {
+ if (stepInnerResult.isEmpty) Future.successful(true)
+ else {
+ val head = stepInnerResult.edgeWithScores.head
+ val zkQuorum = head.edge.innerLabel.hbaseZkAddr
+ val futures = for {
+ edgeWithScore <- stepInnerResult.edgeWithScores
+ } yield {
+ val edge = edgeWithScore.edge
+ val score = edgeWithScore.score
+
+ val edgeSnapshot = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
+ val reversedSnapshotEdgeMutations = storage.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+
+ val edgeForward = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
+ val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
+ storage.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+ storage.buildIncrementsAsync(indexEdge, -1L)
+ }
+
+ /* reverted direction */
+ val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
+ val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+ storage.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+ storage.buildIncrementsAsync(indexEdge, -1L)
+ }
+
+ val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+
+ storage.writeToStorage(zkQuorum, mutations, withWait = true)
+ }
+
+ Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
+ }
+ }
+
def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = {
val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
(edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
@@ -1297,20 +1329,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
case _ =>
edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
}
-// val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
-// case "strong" =>
-// val edge = edgeWithScore.edge
-// edge.property(LabelMeta.timestamp.name, requestTs)
-// val _newPropsWithTs = edge.updatePropsWithTs()
-//
-// (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
-// case _ =>
-// val oldEdge = edgeWithScore.edge
-// (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs())
-// }
-//
-// val copiedEdge =
-// edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
// logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
@@ -1321,11 +1339,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
}
- // def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] =
- // storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts)
def mutateElements(elements: Seq[GraphElement],
- withWait: Boolean = false): Future[Seq[Boolean]] = {
+ withWait: Boolean = false): Future[Seq[MutateResponse]] = {
val edgeBuffer = ArrayBuffer[(S2Edge, Int)]()
val vertexBuffer = ArrayBuffer[(S2Vertex, Int)]()
@@ -1355,7 +1371,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
// def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
- def mutateEdges(edges: Seq[S2Edge], withWait: Boolean = false): Future[Seq[Boolean]] = {
+ def mutateEdges(edges: Seq[S2Edge], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
val edgeWithIdxs = edges.zipWithIndex
val (strongEdges, weakEdges) =
@@ -1383,7 +1399,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
storage.writeToStorage(zkQuorum, mutations, withWait).map { ret =>
- idxs.map(idx => idx -> ret)
+ idxs.map(idx => idx -> ret.isSuccess)
}
}
Future.sequence(futures)
@@ -1398,7 +1414,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
val storage = getStorage(label)
- storage.mutateStrongEdges(edges, withWait = true).map { rets =>
+ mutateStrongEdges(storage)(edges, withWait = true).map { rets =>
idxs.zip(rets)
}
}
@@ -1408,27 +1424,130 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
deleteAll <- Future.sequence(deleteAllFutures)
strong <- Future.sequence(strongEdgesFutures)
} yield {
- (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(_._2)
+ (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(r => new MutateResponse(r._2))
}
}
- def mutateVertices(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = {
+ private def mutateStrongEdges(storage: Storage[_])(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = {
+
+ val edgeWithIdxs = _edges.zipWithIndex
+ val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
+ (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
+ } toSeq
+
+ val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
+ val edges = edgeGroup.map(_._1)
+ val idxs = edgeGroup.map(_._2)
+ // After deleteAll, process others
+ val mutateEdgeFutures = edges.toList match {
+ case head :: tail =>
+ val edgeFuture = mutateEdgesInner(storage)(edges, checkConsistency = true , withWait)
+
+ //TODO: decide what we will do on failure on vertex put
+ val puts = storage.buildVertexPutsAsync(head)
+ val vertexFuture = storage.writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
+ Seq(edgeFuture, vertexFuture)
+ case Nil => Nil
+ }
+
+ val composed = for {
+ // deleteRet <- Future.sequence(deleteAllFutures)
+ mutateRet <- Future.sequence(mutateEdgeFutures)
+ } yield mutateRet
+
+ composed.map(_.forall(_.isSuccess)).map { ret => idxs.map( idx => idx -> ret) }
+ }
+
+ Future.sequence(mutateEdges).map { squashedRets =>
+ squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
+ }
+ }
+
+
+ private def mutateEdgesInner(storage: Storage[_])(edges: Seq[S2Edge],
+ checkConsistency: Boolean,
+ withWait: Boolean): Future[MutateResponse] = {
+ assert(edges.nonEmpty)
+ // TODO:: remove after code review: unreachable code
+ if (!checkConsistency) {
+
+ val zkQuorum = edges.head.innerLabel.hbaseZkAddr
+ val futures = edges.map { edge =>
+ val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
+
+ val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy)
+ val mutations =
+ storage.indexedEdgeMutations(edgeUpdate.deepCopy) ++ storage.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+
+ if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false)
+
+ storage.writeToStorage(zkQuorum, mutations, withWait)
+ }
+ Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
+ } else {
+ storage.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
+ storage.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
+ }
+ }
+ }
+
+ def mutateVertices(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
+ def mutateVertex(storage: Storage[_])(vertex: S2Vertex, withWait: Boolean): Future[MutateResponse] = {
+ if (vertex.op == GraphUtil.operations("delete")) {
+ storage.writeToStorage(vertex.hbaseZkAddr,
+ storage.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
+ } else if (vertex.op == GraphUtil.operations("deleteAll")) {
+ logger.info(s"deleteAll for vertex is truncated. $vertex")
+ Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
+ } else {
+ storage.writeToStorage(vertex.hbaseZkAddr, storage.buildPutsAll(vertex), withWait)
+ }
+ }
+
+ def mutateVertices(storage: Storage[_])(vertices: Seq[S2Vertex],
+ withWait: Boolean = false): Future[Seq[MutateResponse]] = {
+ val futures = vertices.map { vertex => mutateVertex(storage)(vertex, withWait) }
+ Future.sequence(futures)
+ }
+
val verticesWithIdx = vertices.zipWithIndex
val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
- getStorage(service).mutateVertices(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
+ mutateVertices(getStorage(service))(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
}
Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
}
- def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = {
+
+
+ def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
+ def incrementCounts(storage: Storage[_])(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
+ val futures = for {
+ edge <- edges
+ } yield {
+ val kvs = for {
+ relEdge <- edge.relatedEdges
+ edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
+ } yield {
+ val countWithTs = edge.propertyValueInner(LabelMeta.count)
+ val countVal = countWithTs.innerVal.toString().toLong
+ storage.buildIncrementsCountAsync(edgeWithIndex, countVal).head
+ }
+ storage.writeToStorage(edge.innerLabel.hbaseZkAddr, kvs, withWait = withWait)
+ }
+
+ Future.sequence(futures)
+ }
+
val edgesWithIdx = edges.zipWithIndex
val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) =>
- getStorage(label).incrementCounts(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
+ incrementCounts(getStorage(label))(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
+ }
+ Future.sequence(futures).map { ls =>
+ ls.flatten.toSeq.sortBy(_._2).map(_._1)
}
- Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
}
- def updateDegree(edge: S2Edge, degreeVal: Long = 0): Future[Boolean] = {
+ def updateDegree(edge: S2Edge, degreeVal: Long = 0): Future[MutateResponse] = {
val label = edge.innerLabel
val storage = getStorage(label)
@@ -1840,7 +1959,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
val vertex = newVertex(id, ts, props, op, belongLabelIds)
val future = mutateVertices(Seq(vertex), withWait = true).map { rets =>
- if (rets.forall(identity)) vertex
+ if (rets.forall(_.isSuccess)) vertex
else throw new RuntimeException("addVertex failed.")
}
Await.ready(future, WaitTimeout)
@@ -1850,7 +1969,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
def addVertexInner(vertex: S2Vertex): S2Vertex = {
val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets =>
- if (rets.forall(identity)) {
+ if (rets.forall(_.isSuccess)) {
indexProvider.mutateVerticesAsync(Seq(vertex))
} else throw new RuntimeException("addVertex failed.")
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
index c0dc23b..177d859 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
@@ -169,6 +169,29 @@ case class S2Vertex(graph: S2Graph,
graph.fetchEdges(this, labelNameWithDirs.distinct)
}
+ private def edgesAsync(direction: Direction, labelNames: String*): Future[util.Iterator[Edge]] = {
+ val labelNameWithDirs =
+ if (labelNames.isEmpty) {
+ // TODO: Let's clarify direction
+ if (direction == Direction.BOTH) {
+ Label.findBySrcColumnId(id.colId).map(l => l.label -> Direction.OUT.name) ++
+ Label.findByTgtColumnId(id.colId).map(l => l.label -> Direction.IN.name)
+ } else if (direction == Direction.IN) {
+ Label.findByTgtColumnId(id.colId).map(l => l.label -> direction.name)
+ } else {
+ Label.findBySrcColumnId(id.colId).map(l => l.label -> direction.name)
+ }
+ } else {
+ direction match {
+ case Direction.BOTH =>
+ Seq(Direction.OUT, Direction.IN).flatMap { dir => labelNames.map(_ -> dir.name()) }
+ case _ => labelNames.map(_ -> direction.name())
+ }
+ }
+
+ graph.fetchEdgesAsync(this, labelNameWithDirs.distinct)
+ }
+
// do no save to storage
def propertyInner[V](cardinality: Cardinality, key: String, value: V, objects: AnyRef*): VertexProperty[V] = {
S2Property.assertValidProp(key, value)
@@ -242,21 +265,18 @@ case class S2Vertex(graph: S2Graph,
// remove edge
// TODO: remove related edges also.
implicit val ec = graph.ec
- val ts = System.currentTimeMillis()
- val outLabels = Label.findBySrcColumnId(id.colId)
- val inLabels = Label.findByTgtColumnId(id.colId)
+
val verticesToDelete = Seq(this.copy(op = GraphUtil.operations("delete")))
- val outFuture = graph.deleteAllAdjacentEdges(verticesToDelete, outLabels, GraphUtil.directions("out"), ts)
- val inFuture = graph.deleteAllAdjacentEdges(verticesToDelete, inLabels, GraphUtil.directions("in"), ts)
+
val vertexFuture = graph.mutateVertices(verticesToDelete, withWait = true)
+
val future = for {
- outSuccess <- outFuture
- inSuccess <- inFuture
vertexSuccess <- vertexFuture
+ edges <- edgesAsync(Direction.BOTH)
} yield {
- if (!outSuccess) throw new RuntimeException("Vertex.remove out direction edge delete failed.")
- if (!inSuccess) throw new RuntimeException("Vertex.remove in direction edge delete failed.")
- if (!vertexSuccess.forall(identity)) throw new RuntimeException("Vertex.remove vertex delete failed.")
+ edges.asScala.toSeq.foreach { edge => edge.remove() }
+ if (!vertexSuccess.forall(_.isSuccess)) throw new RuntimeException("Vertex.remove vertex delete failed.")
+
true
}
Await.result(future, graph.WaitTimeout)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
deleted file mode 100644
index af20483..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage
-
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceVertexId, VertexId}
-
-
-trait Deserializable[E] extends StorageDeserializable[E] {
- import StorageDeserializable._
-
- type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int)
-
-// /** version 1 and version 2 share same code for parsing row key part */
-// def parseRow(kv: SKeyValue, version: String = HBaseType.DEFAULT_VERSION): RowKeyRaw = {
-// var pos = 0
-// val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version)
-// pos += srcIdLen
-// val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
-// pos += 4
-// val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
-//
-// val rowLen = srcIdLen + 4 + 1
-// (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen)
-// }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/MutateResponse.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/MutateResponse.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/MutateResponse.scala
new file mode 100644
index 0000000..bed1152
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/MutateResponse.scala
@@ -0,0 +1,12 @@
+package org.apache.s2graph.core.storage
+
+object MutateResponse {
+ val Success = new MutateResponse(isSuccess = true)
+ val Failure = new MutateResponse(isSuccess = false)
+ val IncrementFailure = new IncrementResponse(isSuccess = false, -1, -1)
+ val IncrementSuccess = new IncrementResponse(isSuccess = true, -1, -1)
+}
+
+class MutateResponse(val isSuccess: Boolean)
+
+case class IncrementResponse(override val isSuccess: Boolean, afterCount: Long, beforeCount: Long) extends MutateResponse(isSuccess)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
index db9a9da..924d9a3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
@@ -26,6 +26,7 @@ import org.hbase.async.KeyValue
object SKeyValue {
val EdgeCf = "e".getBytes("UTF-8")
+ val VertexCf = "v".getBytes("UTF-8")
val Put = 1
val Delete = 2
val Increment = 3
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala
deleted file mode 100644
index 6de0b30..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage
-
-object Serializable {
- val vertexCf = "v".getBytes("UTF-8")
- val edgeCf = "e".getBytes("UTF-8")
-}
-
-trait Serializable[E] extends StorageSerializable[E]