You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/10/30 10:23:31 UTC
[3/8] incubator-s2graph git commit: Separate interfaces from Storage.
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 57d4872..c9353e1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -20,948 +20,16 @@
package org.apache.s2graph.core.storage
-import org.apache.s2graph.core.GraphExceptions.{NoStackException, FetchTimeoutException}
+import com.typesafe.config.Config
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
-import org.apache.s2graph.core.parsers.WhereParser
-import org.apache.s2graph.core.storage.serde.indexedge.wide.{IndexEdgeDeserializable, IndexEdgeSerializable}
-import org.apache.s2graph.core.storage.serde.snapshotedge.wide.SnapshotEdgeDeserializable
-import org.apache.s2graph.core.storage.serde.vertex.{VertexDeserializable, VertexSerializable}
+import org.apache.s2graph.core.storage.serde.Deserializable
+import org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializable
import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Random, Try}
-import java.util.concurrent.{Executors, TimeUnit}
+import scala.concurrent.{ExecutionContext, Future}
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.hadoop.hbase.util.Bytes
-
-
-abstract class Storage[Q, R](val graph: S2Graph,
- val config: Config)(implicit ec: ExecutionContext) {
- import HBaseType._
- import S2Graph._
-
- val BackoffTimeout = graph.BackoffTimeout
- val MaxRetryNum = graph.MaxRetryNum
- val MaxBackOff = graph.MaxBackOff
- val FailProb = graph.FailProb
- val LockExpireDuration = graph.LockExpireDuration
- val MaxSize = graph.MaxSize
- val ExpireAfterWrite = graph.ExpireAfterWrite
- val ExpireAfterAccess = graph.ExpireAfterAccess
-
- /** retry scheduler */
- val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor()
-
-
- /**
- * Compatibility table
- * | label schema version | snapshot edge | index edge | vertex | note |
- * | v1 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue |
- * | v2 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue |
- * | v3 | serde.snapshotedge.tall | serde.indexedge.wide | serde.vertex | recommended with HBase. current stable schema |
- * | v4 | serde.snapshotedge.tall | serde.indexedge.tall | serde.vertex | experimental schema. use scanner instead of get |
- *
- */
-
- /**
- * create serializer that knows how to convert given snapshotEdge into kvs: Seq[SKeyValue]
- * so we can store this kvs.
- * @param snapshotEdge: snapshotEdge to serialize
- * @return serializer implementation for StorageSerializable which has toKeyValues return Seq[SKeyValue]
- */
- def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): Serializable[SnapshotEdge] = {
- snapshotEdge.schemaVer match {
-// case VERSION1 |
- case VERSION2 => new serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge)
- case VERSION3 | VERSION4 => new serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge)
- case _ => throw new RuntimeException(s"not supported version: ${snapshotEdge.schemaVer}")
- }
- }
-
- /**
- * create serializer that knows how to convert given indexEdge into kvs: Seq[SKeyValue]
- * @param indexEdge: indexEdge to serialize
- * @return serializer implementation
- */
- def indexEdgeSerializer(indexEdge: IndexEdge): Serializable[IndexEdge] = {
- indexEdge.schemaVer match {
-// case VERSION1
- case VERSION2 | VERSION3 => new IndexEdgeSerializable(indexEdge)
- case VERSION4 => new serde.indexedge.tall.IndexEdgeSerializable(indexEdge)
- case _ => throw new RuntimeException(s"not supported version: ${indexEdge.schemaVer}")
-
- }
- }
-
- /**
- * create serializer that knows how to convert given vertex into kvs: Seq[SKeyValue]
- * @param vertex: vertex to serialize
- * @return serializer implementation
- */
- def vertexSerializer(vertex: S2Vertex): Serializable[S2Vertex] = new VertexSerializable(vertex)
-
- /**
- * create deserializer that can parse stored CanSKeyValue into snapshotEdge.
- * note that each storage implementation should implement implicit type class
- * to convert storage dependent dataType into common SKeyValue type by implementing CanSKeyValue
- *
- * ex) Asynchbase use it's KeyValue class and CanSKeyValue object has implicit type conversion method.
- * if any storaage use different class to represent stored byte array,
- * then that storage implementation is responsible to provide implicit type conversion method on CanSKeyValue.
- * */
-
- val snapshotEdgeDeserializer: Deserializable[SnapshotEdge] = new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph)
-
- def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge] = snapshotEdgeDeserializer
-
- /** create deserializer that can parse stored CanSKeyValue into indexEdge. */
- val indexEdgeDeserializer: Deserializable[S2Edge] = new serde.indexedge.tall.IndexEdgeDeserializable(graph)
-
- def indexEdgeDeserializer(schemaVer: String) = new serde.indexedge.tall.IndexEdgeDeserializable(graph)
-
- /** create deserializer that can parser stored CanSKeyValue into vertex. */
- val vertexDeserializer: Deserializable[S2Vertex] = new VertexDeserializable(graph)
-
-
- /**
- * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
- * note that this should be return true on all success.
- * we assumes that each storage implementation has client as member variable.
- *
- *
- * @param cluster: where this key values should be stored.
- * @param kvs: sequence of SKeyValue that need to be stored in storage.
- * @param withWait: flag to control wait ack from storage.
- * note that in AsynchbaseStorage(which support asynchronous operations), even with true,
- * it never block thread, but rather submit work and notified by event loop when storage send ack back.
- * @return ack message from storage.
- */
- def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean]
-
-// def writeToStorage(kv: SKeyValue, withWait: Boolean): Future[Boolean]
-
- /**
- * fetch SnapshotEdge for given request from storage.
- * also storage datatype should be converted into SKeyValue.
- * note that return type is Sequence rather than single SKeyValue for simplicity,
- * even though there is assertions sequence.length == 1.
- * @param request
- * @return
- */
- def fetchSnapshotEdgeKeyValues(request: QueryRequest): Future[Seq[SKeyValue]]
-
- /**
- * write requestKeyValue into storage if the current value in storage that is stored matches.
- * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
- *
- * Most important thing is this have to be 'atomic' operation.
- * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
- * either blocked or failed on write-write conflict case.
- *
- * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
- * prevent wrong data for read.
- *
- * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
- * compareAndSet to synchronize.
- *
- * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
- * for storage that does not support concurrency control, then storage implementation
- * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
- * and write(writeLock).
- * @param requestKeyValue
- * @param expectedOpt
- * @return
- */
- def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue]): Future[Boolean]
-
- /**
- * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues.
- * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build
- * client request(GetRequest, Scanner) based on user provided query.
- *
- * @param queryRequest
- * @return
- */
- protected def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q
-
- /**
- * fetch IndexEdges for given queryParam in queryRequest.
- * this expect previous step starting score to propagate score into next step.
- * also parentEdges is necessary to return full bfs tree when query require it.
- *
- * note that return type is general type.
- * for example, currently we wanted to use Asynchbase
- * so single I/O return type should be Deferred[T].
- *
- * if we use native hbase client, then this return type can be Future[T] or just T.
- *
- * @param queryRequest
- * @param isInnerCall
- * @param parentEdges
- * @return
- */
- def fetch(queryRequest: QueryRequest,
- isInnerCall: Boolean,
- parentEdges: Seq[EdgeWithScore]): R
-
- /**
- * responsible to fire parallel fetch call into storage and create future that will return merged result.
- *
- * @param queryRequests
- * @param prevStepEdges
- * @return
- */
- def fetches(queryRequests: Seq[QueryRequest],
- prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]]
-
- /**
- * fetch Vertex for given request from storage.
- *
- * @param request
- * @return
- */
- def fetchVertexKeyValues(request: QueryRequest): Future[Seq[SKeyValue]]
-
- /**
- * decide how to apply given edges(indexProps values + Map(_count -> countVal)) into storage.
- *
- * @param edges
- * @param withWait
- * @return
- */
- def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]]
-
- /**
- * this method need to be called when client shutdown. this is responsible to cleanUp the resources
- * such as client into storage.
- */
- def flush(): Unit = {
- }
-
- def fetchEdgesAll(): Future[Seq[S2Edge]]
-
- def fetchVerticesAll(): Future[Seq[S2Vertex]]
-
- /**
- * create table on storage.
- * if storage implementation does not support namespace or table, then there is nothing to be done
- *
- * @param zkAddr
- * @param tableName
- * @param cfs
- * @param regionMultiplier
- * @param ttl
- * @param compressionAlgorithm
- */
- def createTable(zkAddr: String,
- tableName: String,
- cfs: List[String],
- regionMultiplier: Int,
- ttl: Option[Int],
- compressionAlgorithm: String,
- replicationScopeOpt: Option[Int] = None,
- totalRegionCount: Option[Int] = None): Unit
-
- def truncateTable(zkAddr: String, tableNameStr: String): Unit = {}
-
- def deleteTable(zkAddr: String, tableNameStr: String): Unit = {}
-
- def shutdown(): Unit
-
- /** Public Interface */
- def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
- def fromResult(kvs: Seq[SKeyValue],
- version: String): Option[S2Vertex] = {
- if (kvs.isEmpty) None
- else vertexDeserializer.fromKeyValues(kvs, None)
-// .map(S2Vertex(graph, _))
- }
-
- val futures = vertices.map { vertex =>
- val queryParam = QueryParam.Empty
- val q = Query.toQuery(Seq(vertex), Seq(queryParam))
- val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
-
- fetchVertexKeyValues(queryRequest).map { kvs =>
- fromResult(kvs, vertex.serviceColumn.schemaVersion)
- } recoverWith { case ex: Throwable =>
- Future.successful(None)
- }
- }
-
- Future.sequence(futures).map { result => result.toList.flatten }
- }
- def mutateStrongEdges(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = {
-
- val edgeWithIdxs = _edges.zipWithIndex
- val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
- (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
- } toSeq
-
- val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
- val edges = edgeGroup.map(_._1)
- val idxs = edgeGroup.map(_._2)
- // After deleteAll, process others
- val mutateEdgeFutures = edges.toList match {
- case head :: tail =>
- val edgeFuture = mutateEdgesInner(edges, checkConsistency = true , withWait)
-
- //TODO: decide what we will do on failure on vertex put
- val puts = buildVertexPutsAsync(head)
- val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
- Seq(edgeFuture, vertexFuture)
- case Nil => Nil
- }
-
- val composed = for {
-// deleteRet <- Future.sequence(deleteAllFutures)
- mutateRet <- Future.sequence(mutateEdgeFutures)
- } yield mutateRet
-
- composed.map(_.forall(identity)).map { ret => idxs.map( idx => idx -> ret) }
- }
-
- Future.sequence(mutateEdges).map { squashedRets =>
- squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
- }
- }
-
- def mutateVertex(vertex: S2Vertex, withWait: Boolean): Future[Boolean] = {
- if (vertex.op == GraphUtil.operations("delete")) {
- writeToStorage(vertex.hbaseZkAddr,
- vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
- } else if (vertex.op == GraphUtil.operations("deleteAll")) {
- logger.info(s"deleteAll for vertex is truncated. $vertex")
- Future.successful(true) // Ignore withWait parameter, because deleteAll operation may takes long time
- } else {
- writeToStorage(vertex.hbaseZkAddr, buildPutsAll(vertex), withWait)
- }
- }
-
- def mutateVertices(vertices: Seq[S2Vertex],
- withWait: Boolean = false): Future[Seq[Boolean]] = {
- val futures = vertices.map { vertex => mutateVertex(vertex, withWait) }
- Future.sequence(futures)
- }
-
-
- def mutateEdgesInner(edges: Seq[S2Edge],
- checkConsistency: Boolean,
- withWait: Boolean): Future[Boolean] = {
- assert(edges.nonEmpty)
- // TODO:: remove after code review: unreachable code
- if (!checkConsistency) {
-
- val zkQuorum = edges.head.innerLabel.hbaseZkAddr
- val futures = edges.map { edge =>
- val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
-
- val (bufferIncr, nonBufferIncr) = increments(edgeUpdate.deepCopy)
- val mutations =
- indexedEdgeMutations(edgeUpdate.deepCopy) ++ snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
-
- if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
-
- writeToStorage(zkQuorum, mutations, withWait)
- }
- Future.sequence(futures).map { rets => rets.forall(identity) }
- } else {
- fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
- retry(1)(edges, 0, snapshotEdgeOpt)
- }
- }
- }
-
- def exponentialBackOff(tryNum: Int) = {
- // time slot is divided by 10 ms
- val slot = 10
- Random.nextInt(Math.min(BackoffTimeout, slot * Math.pow(2, tryNum)).toInt)
- }
-
- def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge]): Future[Boolean] = {
- if (tryNum >= MaxRetryNum) {
- edges.foreach { edge =>
- logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}")
- }
-
- Future.successful(false)
- } else {
- val future = commitUpdate(edges, statusCode, fetchedSnapshotEdgeOpt)
- future.onSuccess {
- case success =>
- logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n")
- }
- future recoverWith {
- case FetchTimeoutException(retryEdge) =>
- logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
- /* fetch failed. re-fetch should be done */
- fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
- retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
- }
-
- case PartialFailureException(retryEdge, failedStatusCode, faileReason) =>
- val status = failedStatusCode match {
- case 0 => "AcquireLock failed."
- case 1 => "Mutation failed."
- case 2 => "Increment failed."
- case 3 => "ReleaseLock failed."
- case 4 => "Unknown"
- }
- logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
-
- /* retry logic */
- val promise = Promise[Boolean]
- val backOff = exponentialBackOff(tryNum)
- scheduledThreadPool.schedule(new Runnable {
- override def run(): Unit = {
- val future = if (failedStatusCode == 0) {
- // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
- /* fetch failed. re-fetch should be done */
- fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
- retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
- }
- } else {
- // partial failure occur while self locked and mutating.
- // assert(fetchedSnapshotEdgeOpt.nonEmpty)
- retry(tryNum + 1)(edges, failedStatusCode, fetchedSnapshotEdgeOpt)
- }
- promise.completeWith(future)
- }
-
- }, backOff, TimeUnit.MILLISECONDS)
- promise.future
-
- case ex: Exception =>
- logger.error("Unknown exception", ex)
- Future.successful(false)
- }
- }
- }
-
- protected def commitUpdate(edges: Seq[S2Edge],
- statusCode: Byte,
- fetchedSnapshotEdgeOpt: Option[S2Edge]): Future[Boolean] = {
-// Future.failed(new PartialFailureException(edges.head, 0, "ahahah"))
- assert(edges.nonEmpty)
-// assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined)
-
- statusCode match {
- case 0 =>
- fetchedSnapshotEdgeOpt match {
- case None =>
- /*
- * no one has never mutated this SN.
- * (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges)
- * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1)
- * lock = (squashedEdge, pendingE)
- * releaseLock = (edgeMutate.newSnapshotEdge, None)
- */
- val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
-
- assert(edgeMutate.newSnapshotEdge.isDefined)
-
- val lockTs = Option(System.currentTimeMillis())
- val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = squashedEdge.ts + 1)
- val lockSnapshotEdge = squashedEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
- val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
- pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
-
- commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
-
- case Some(snapshotEdge) =>
- snapshotEdge.pendingEdgeOpt match {
- case None =>
- /*
- * others finished commit on this SN. but there is no contention.
- * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges)
- * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ?
- * lock = (snapshotEdge, pendingE)
- * releaseLock = (edgeMutate.newSnapshotEdge, None)
- */
- val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
- if (edgeMutate.newSnapshotEdge.isEmpty) {
- logger.debug(s"drop this requests: \n${edges.map(_.toLogString).mkString("\n")}")
- Future.successful(true)
- } else {
- val lockTs = Option(System.currentTimeMillis())
- val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1)
- val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
- val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
- pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
- commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
- }
- case Some(pendingEdge) =>
- val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis()
- if (isLockExpired) {
- /*
- * if pendingEdge.ts == snapshotEdge.ts =>
- * (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge))
- * else =>
- * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, Seq(pendingEdge))
- * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1)
- * lock = (snapshotEdge, pendingE)
- * releaseLock = (edgeMutate.newSnapshotEdge, None)
- */
- logger.debug(s"${pendingEdge.toLogString} has been expired.")
- val (squashedEdge, edgeMutate) =
- if (pendingEdge.ts == snapshotEdge.ts) S2Edge.buildOperation(None, pendingEdge +: edges)
- else S2Edge.buildOperation(fetchedSnapshotEdgeOpt, pendingEdge +: edges)
-
- val lockTs = Option(System.currentTimeMillis())
- val newPendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1)
- val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(newPendingEdge))
- val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
- pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
-
- commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
- } else {
- /*
- * others finished commit on this SN and there is currently contention.
- * this can't be proceed so retry from re-fetch.
- * throw EX
- */
- val (squashedEdge, _) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
- Future.failed(new PartialFailureException(squashedEdge, 0, s"others[${pendingEdge.ts}] is mutating. me[${squashedEdge.ts}]"))
- }
- }
-
- }
- case _ =>
-
- /*
- * statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock
- */
-
- /*
- * this succeed to lock this SN. keep doing on commit process.
- * if SN.isEmpty =>
- * no one never succed to commit on this SN.
- * this is first mutation try on this SN.
- * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges)
- * else =>
- * assert(SN.pengingEdgeOpt.isEmpty) no-fetch after acquire lock when self retrying.
- * there has been success commit on this SN.
- * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges)
- * releaseLock = (edgeMutate.newSnapshotEdge, None)
- */
- val _edges =
- if (fetchedSnapshotEdgeOpt.isDefined && fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.isDefined) fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges
- else edges
- val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges)
- val newVersion = fetchedSnapshotEdgeOpt.map(_.version).getOrElse(squashedEdge.ts) + 2
- val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match {
- case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion)
- case Some(newSnapshotEdge) => newSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion)
- }
- // lockSnapshotEdge will be ignored.
- commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, releaseLockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
- }
- }
- /**
- * orchestrate commit process.
- * we separate into 4 step to avoid duplicating each step over and over.
- *
- * @param statusCode: current statusCode of this thread to process edges.
- * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge.
- * @param fetchedSnapshotEdgeOpt: fetched snapshotEdge from storage before commit process begin.
- * @param lockSnapshotEdge: lockEdge that hold necessary data to lock this snapshotEdge for this thread.
- * @param releaseLockSnapshotEdge: releaseLockEdge that will remove lock by storing new final merged states
- * all from current request edges and fetched snapshotEdge.
- * @param edgeMutate: mutations for indexEdge and snapshotEdge.
- * @return
- */
- protected def commitProcess(statusCode: Byte,
- squashedEdge: S2Edge,
- fetchedSnapshotEdgeOpt:Option[S2Edge],
- lockSnapshotEdge: SnapshotEdge,
- releaseLockSnapshotEdge: SnapshotEdge,
- edgeMutate: EdgeMutate): Future[Boolean] = {
- for {
- locked <- acquireLock(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge)
- mutated <- commitIndexEdgeMutations(locked, statusCode, squashedEdge, edgeMutate)
- incremented <- commitIndexEdgeDegreeMutations(mutated, statusCode, squashedEdge, edgeMutate)
- lockReleased <- releaseLock(incremented, statusCode, squashedEdge, releaseLockSnapshotEdge)
- } yield lockReleased
- }
-
- case class PartialFailureException(edge: S2Edge, statusCode: Byte, failReason: String) extends NoStackException(failReason)
-
- protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = {
- val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}").mkString("\n")
- logger.debug(msg)
- }
-
- protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, edgeMutate: EdgeMutate) = {
- val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}",
- s"${edgeMutate.toLogString}").mkString("\n")
- logger.debug(msg)
- }
-
- /**
- * try to acquire lock on storage for this given snapshotEdge(lockEdge).
- *
- * @param statusCode: current statusCode of this thread to process edges.
- * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug
- * @param fetchedSnapshotEdgeOpt: fetched snapshot edge from storage.
- * @param lockEdge: lockEdge to build RPC request(compareAndSet) into Storage.
- * @return
- */
- protected def acquireLock(statusCode: Byte,
- squashedEdge: S2Edge,
- fetchedSnapshotEdgeOpt: Option[S2Edge],
- lockEdge: SnapshotEdge): Future[Boolean] = {
- if (statusCode >= 1) {
- logger.debug(s"skip acquireLock: [$statusCode]\n${squashedEdge.toLogString}")
- Future.successful(true)
- } else {
- val p = Random.nextDouble()
- if (p < FailProb) {
- Future.failed(new PartialFailureException(squashedEdge, 0, s"$p"))
- } else {
- val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head
- val oldPut = fetchedSnapshotEdgeOpt.map(e => snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head)
- writeLock(lockEdgePut, oldPut).recoverWith { case ex: Exception =>
- logger.error(s"AcquireLock RPC Failed.")
- throw new PartialFailureException(squashedEdge, 0, "AcquireLock RPC Failed")
- }.map { ret =>
- if (ret) {
- val log = Seq(
- "\n",
- "=" * 50,
- s"[Success]: acquireLock",
- s"[RequestEdge]: ${squashedEdge.toLogString}",
- s"[LockEdge]: ${lockEdge.toLogString()}",
- s"[PendingEdge]: ${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}",
- "=" * 50, "\n").mkString("\n")
-
- logger.debug(log)
- // debug(ret, "acquireLock", edge.toSnapshotEdge)
- } else {
- throw new PartialFailureException(squashedEdge, 0, "hbase fail.")
- }
- true
- }
- }
- }
- }
-
-
- /**
- * change this snapshot's state on storage from locked into committed by
- * storing new merged states on storage. merge state come from releaseLockEdge.
- * note that releaseLock return Future.failed on predicate failure.
- *
- * @param predicate: indicate if this releaseLock phase should be proceed or not.
- * @param statusCode: releaseLock do not use statusCode, only for debug.
- * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug
- * @param releaseLockEdge: final merged states if all process goes well.
- * @return
- */
- protected def releaseLock(predicate: Boolean,
- statusCode: Byte,
- squashedEdge: S2Edge,
- releaseLockEdge: SnapshotEdge): Future[Boolean] = {
- if (!predicate) {
- Future.failed(new PartialFailureException(squashedEdge, 3, "predicate failed."))
- } else {
- val p = Random.nextDouble()
- if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 3, s"$p"))
- else {
- val releaseLockEdgePuts = snapshotEdgeSerializer(releaseLockEdge).toKeyValues
- writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, releaseLockEdgePuts, withWait = true).recoverWith {
- case ex: Exception =>
- logger.error(s"ReleaseLock RPC Failed.")
- throw new PartialFailureException(squashedEdge, 3, "ReleaseLock RPC Failed")
- }.map { ret =>
- if (ret) {
- debug(ret, "releaseLock", squashedEdge.toSnapshotEdge)
- } else {
- val msg = Seq("\nFATAL ERROR\n",
- "=" * 50,
- squashedEdge.toLogString,
- releaseLockEdgePuts,
- "=" * 50,
- "\n"
- )
- logger.error(msg.mkString("\n"))
- // error(ret, "releaseLock", edge.toSnapshotEdge)
- throw new PartialFailureException(squashedEdge, 3, "hbase fail.")
- }
- true
- }
- }
- }
- }
-
- /**
- *
- * @param predicate: indicate if this commitIndexEdgeMutations phase should be proceed or not.
- * @param statusCode: current statusCode of this thread to process edges.
- * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug
- * @param edgeMutate: actual collection of mutations. note that edgeMutate contains snapshotEdge mutations,
- * but in here, we only use indexEdge's mutations.
- * @return
- */
- protected def commitIndexEdgeMutations(predicate: Boolean,
- statusCode: Byte,
- squashedEdge: S2Edge,
- edgeMutate: EdgeMutate): Future[Boolean] = {
- if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 1, "predicate failed."))
- else {
- if (statusCode >= 2) {
- logger.debug(s"skip mutate: [$statusCode]\n${squashedEdge.toLogString}")
- Future.successful(true)
- } else {
- val p = Random.nextDouble()
- if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 1, s"$p"))
- else
- writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, indexedEdgeMutations(edgeMutate), withWait = true).map { ret =>
- if (ret) {
- debug(ret, "mutate", squashedEdge.toSnapshotEdge, edgeMutate)
- } else {
- throw new PartialFailureException(squashedEdge, 1, "hbase fail.")
- }
- true
- }
- }
- }
- }
-
- /**
- *
- * @param predicate: indicate if this commitIndexEdgeMutations phase should be proceed or not.
- * @param statusCode: current statusCode of this thread to process edges.
- * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug
- * @param edgeMutate: actual collection of mutations. note that edgeMutate contains snapshotEdge mutations,
- * but in here, we only use indexEdge's degree mutations.
- * @return
- */
- protected def commitIndexEdgeDegreeMutations(predicate: Boolean,
- statusCode: Byte,
- squashedEdge: S2Edge,
- edgeMutate: EdgeMutate): Future[Boolean] = {
-
- def _write(kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = {
- writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, kvs, withWait = withWait).map { ret =>
- if (ret) {
- debug(ret, "increment", squashedEdge.toSnapshotEdge, edgeMutate)
- } else {
- throw new PartialFailureException(squashedEdge, 2, "hbase fail.")
- }
- true
- }
- }
-
- if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 2, "predicate failed."))
- if (statusCode >= 3) {
- logger.debug(s"skip increment: [$statusCode]\n${squashedEdge.toLogString}")
- Future.successful(true)
- } else {
- val p = Random.nextDouble()
- if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 2, s"$p"))
- else {
- val (bufferIncr, nonBufferIncr) = increments(edgeMutate.deepCopy)
-
- if (bufferIncr.nonEmpty) _write(bufferIncr, withWait = false)
- _write(nonBufferIncr, withWait = true)
- }
- }
- }
-
- /** end of methods for consistency */
-
- def mutateLog(snapshotEdgeOpt: Option[S2Edge], edges: Seq[S2Edge],
- newEdge: S2Edge, edgeMutate: EdgeMutate) =
- Seq("----------------------------------------------",
- s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}",
- s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}",
- s"newEdge: ${newEdge.toLogString}",
- s"mutation: \n${edgeMutate.toLogString}",
- "----------------------------------------------").mkString("\n")
-
-
- /** Delete All */
- def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
- requestTs: Long,
- retryNum: Int): Future[Boolean] = {
- if (stepInnerResult.isEmpty) Future.successful(true)
- else {
- val head = stepInnerResult.edgeWithScores.head
- val zkQuorum = head.edge.innerLabel.hbaseZkAddr
- val futures = for {
- edgeWithScore <- stepInnerResult.edgeWithScores
- } yield {
- val edge = edgeWithScore.edge
- val score = edgeWithScore.score
-
- val edgeSnapshot = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
- val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
-
- val edgeForward = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
- val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
- indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- buildIncrementsAsync(indexEdge, -1L)
- }
-
- /* reverted direction */
- val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
- val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
- indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- buildIncrementsAsync(indexEdge, -1L)
- }
-
- val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
-
- writeToStorage(zkQuorum, mutations, withWait = true)
- }
-
- Future.sequence(futures).map { rets => rets.forall(identity) }
- }
- }
-
- /** End Of Delete All */
-
-
-
-
- /** Parsing Logic: parse from kv from Storage into Edge */
- def toEdge[K: CanSKeyValue](kv: K,
- queryRequest: QueryRequest,
- cacheElementOpt: Option[S2Edge],
- parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = {
- logger.debug(s"toEdge: $kv")
-
- try {
- val queryOption = queryRequest.query.queryOption
- val queryParam = queryRequest.queryParam
- val schemaVer = queryParam.label.schemaVersion
- val indexEdgeOpt = indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt)
- if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.copy(parentEdges = parentEdges))
- else indexEdgeOpt
- } catch {
- case ex: Exception =>
- logger.error(s"Fail on toEdge: ${kv.toString}, ${queryRequest}", ex)
- None
- }
- }
-
- def toSnapshotEdge[K: CanSKeyValue](kv: K,
- queryRequest: QueryRequest,
- cacheElementOpt: Option[SnapshotEdge] = None,
- isInnerCall: Boolean,
- parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = {
-// logger.debug(s"SnapshottoEdge: $kv")
- val queryParam = queryRequest.queryParam
- val schemaVer = queryParam.label.schemaVersion
- val snapshotEdgeOpt = snapshotEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt)
-
- if (isInnerCall) {
- snapshotEdgeOpt.flatMap { snapshotEdge =>
- val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges)
- if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge)
- else None
- }
- } else {
- snapshotEdgeOpt.flatMap { snapshotEdge =>
- if (snapshotEdge.allPropsDeleted) None
- else {
- val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges)
- if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge)
- else None
- }
- }
- }
- }
-
- val dummyCursor: Array[Byte] = Array.empty
-
- def toEdges[K: CanSKeyValue](kvs: Seq[K],
- queryRequest: QueryRequest,
- prevScore: Double = 1.0,
- isInnerCall: Boolean,
- parentEdges: Seq[EdgeWithScore],
- startOffset: Int = 0,
- len: Int = Int.MaxValue): StepResult = {
-
- val toSKeyValue = implicitly[CanSKeyValue[K]].toSKeyValue _
-
- if (kvs.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor))
- else {
- val queryOption = queryRequest.query.queryOption
- val queryParam = queryRequest.queryParam
- val labelWeight = queryRequest.labelWeight
- val nextStepOpt = queryRequest.nextStepOpt
- val where = queryParam.where.get
- val label = queryParam.label
- val isDefaultTransformer = queryParam.edgeTransformer.isDefault
- val first = kvs.head
- val kv = first
- val schemaVer = queryParam.label.schemaVersion
- val cacheElementOpt =
- if (queryParam.isSnapshotEdge) None
- else indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), None)
-
- val (degreeEdges, keyValues) = cacheElementOpt match {
- case None => (Nil, kvs)
- case Some(cacheElement) =>
- val head = cacheElement
- if (!head.isDegree) (Nil, kvs)
- else (Seq(EdgeWithScore(head, 1.0, label)), kvs.tail)
- }
-
- val lastCursor: Seq[Array[Byte]] = Seq(if (keyValues.nonEmpty) toSKeyValue(keyValues(keyValues.length - 1)).row else dummyCursor)
-
- if (!queryOption.ignorePrevStepCache) {
- val edgeWithScores = for {
- (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
- edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
- if where == WhereParser.success || where.filter(edge)
- convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
- } yield {
- val score = edge.rank(queryParam.rank)
- EdgeWithScore(convertedEdge, score, label)
- }
- StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor)
- } else {
- val degreeScore = 0.0
-
- val edgeWithScores = for {
- (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
- edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
- if where == WhereParser.success || where.filter(edge)
- convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
- } yield {
- val edgeScore = edge.rank(queryParam.rank)
- val score = queryParam.scorePropagateOp match {
- case "plus" => edgeScore + prevScore
- case "divide" =>
- if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
- else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
- case _ => edgeScore * prevScore
- }
- val tsVal = processTimeDecay(queryParam, edge)
- val newScore = degreeScore + score
- EdgeWithScore(convertedEdge.copy(parentEdges = parentEdges), score = newScore * labelWeight * tsVal, label = label)
- }
-
- val sampled =
- if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
- else edgeWithScores
-
- val normalized = if (queryParam.shouldNormalize) normalize(sampled) else sampled
-
- StepResult(edgeWithScores = normalized, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor)
- }
- }
- }
-
- /** End Of Parse Logic */
-
- protected def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2Edge = {
+object Storage {
+ def toRequestEdge(graph: S2Graph)(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2Edge = {
val srcVertex = queryRequest.vertex
val queryParam = queryRequest.queryParam
val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt
@@ -988,133 +56,131 @@ abstract class Storage[Q, R](val graph: S2Graph,
}
}
- protected def fetchSnapshotEdgeInner(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = {
- /* TODO: Fix this. currently fetchSnapshotEdge should not use future cache
- * so use empty cacheKey.
- * */
- val queryParam = QueryParam(labelName = edge.innerLabel.label,
- direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
- tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
- cacheTTLInMillis = -1)
- val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))
- val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
- // val q = Query.toQuery(Seq(edge.srcVertex), queryParam)
+}
- fetchSnapshotEdgeKeyValues(queryRequest).map { kvs =>
- val (edgeOpt, kvOpt) =
- if (kvs.isEmpty) (None, None)
- else {
- val snapshotEdgeOpt = toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
- val _kvOpt = kvs.headOption
- (snapshotEdgeOpt, _kvOpt)
- }
- (queryParam, edgeOpt, kvOpt)
- } recoverWith { case ex: Throwable =>
- logger.error(s"fetchQueryParam failed. fallback return.", ex)
- throw new FetchTimeoutException(s"${edge.toLogString}")
- }
- }
+abstract class Storage[Q](val graph: S2Graph,
+ val config: Config) {
+ /* Storage backend specific resource management */
+ val management: StorageManagement
+
+ /* Physically store given KeyValue into backend storage. */
+ val mutator: StorageWritable
+
+ /*
+ * Given QueryRequest/Vertex/Edge, fetch KeyValue from storage
+ * then convert them into Edge/Vertex
+ */
+ val fetcher: StorageReadable[Q]
+
+ /*
+ * Serialize Edge/Vertex, to common KeyValue, SKeyValue that
+ * can be stored aligned to backend storage's physical schema.
+ * Also Deserialize storage backend's KeyValue to SKeyValue.
+ */
+ val serDe: StorageSerDe
+
+ /*
+ * Common helper to translate SKeyValue to Edge/Vertex and vice versa.
+ * Note that it require storage backend specific implementation for serialize/deserialize.
+ */
+ lazy val io: StorageIO = new StorageIO(graph, serDe)
+
+ /*
+ * Common helper to resolve write-write conflict on snapshot edge with same EdgeId.
+ * Note that it require storage backend specific implementations for
+ * all of StorageWritable, StorageReadable, StorageSerDe, StorageIO
+ */
+ lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, fetcher)
+
+
+ /** IO **/
+ def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): serde.Serializable[SnapshotEdge] =
+ serDe.snapshotEdgeSerializer(snapshotEdge)
+
+ def indexEdgeSerializer(indexEdge: IndexEdge): serde.Serializable[IndexEdge] =
+ serDe.indexEdgeSerializer(indexEdge)
+
+ def vertexSerializer(vertex: S2Vertex): serde.Serializable[S2Vertex] =
+ serDe.vertexSerializer(vertex)
+ def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge] =
+ serDe.snapshotEdgeDeserializer(schemaVer)
- /** end of query */
+ def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable =
+ serDe.indexEdgeDeserializer(schemaVer)
+
+ def vertexDeserializer(schemaVer: String): Deserializable[S2Vertex] =
+ serDe.vertexDeserializer(schemaVer)
/** Mutation Builder */
+ def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) =
+ io.increments(edgeMutate)
+ def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
+ io.indexedEdgeMutations(edgeMutate)
- /** EdgeMutate */
- def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = {
- // skip sampling for delete operation
- val deleteMutations = edgeMutate.edgesToDeleteWithIndexOpt.flatMap { indexEdge =>
- indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete, durability = indexEdge.label.durability))
- }
+ def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] =
+ io.buildIncrementsAsync(indexedEdge, amount)
- val insertMutations = edgeMutate.edgesToInsertWithIndexOpt.flatMap { indexEdge =>
- indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability))
- }
+ def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] =
+ io.buildIncrementsCountAsync(indexedEdge, amount)
- deleteMutations ++ insertMutations
- }
+ def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] =
+ io.buildVertexPutsAsync(edge)
def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
- edgeMutate.newSnapshotEdge.map(e => snapshotEdgeSerializer(e).toKeyValues.map(_.copy(durability = e.label.durability))).getOrElse(Nil)
+ io.snapshotEdgeMutations(edgeMutate)
- def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = {
- (edgeMutate.edgesToDeleteWithIndexOptForDegree.isEmpty, edgeMutate.edgesToInsertWithIndexOptForDegree.isEmpty) match {
- case (true, true) =>
- /* when there is no need to update. shouldUpdate == false */
- Nil -> Nil
+ def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] =
+ io.buildDegreePuts(edge, degreeVal)
- case (true, false) =>
- /* no edges to delete but there is new edges to insert so increase degree by 1 */
- val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToInsertWithIndexOptForDegree)
- buffer.flatMap(buildIncrementsAsync(_)) -> nonBuffer.flatMap(buildIncrementsAsync(_))
+ def buildPutsAll(vertex: S2Vertex): Seq[SKeyValue] =
+ io.buildPutsAll(vertex)
- case (false, true) =>
- /* no edges to insert but there is old edges to delete so decrease degree by 1 */
- val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToDeleteWithIndexOptForDegree)
- buffer.flatMap(buildIncrementsAsync(_, -1)) -> nonBuffer.flatMap(buildIncrementsAsync(_, -1))
+ /** Mutation **/
- case (false, false) =>
- /* update on existing edges so no change on degree */
- Nil -> Nil
- }
- }
+ def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
+ mutator.writeToStorage(cluster, kvs, withWait)
- /** IndexEdge */
- def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = {
- val newProps = indexedEdge.updatePropsWithTs()
- newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts))
- val _indexedEdge = indexedEdge.copy(propsWithTs = newProps)
- indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability))
- }
+ def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] =
+ mutator.writeLock(requestKeyValue, expectedOpt)
- def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = {
- val newProps = indexedEdge.updatePropsWithTs()
- newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts))
- val _indexedEdge = indexedEdge.copy(propsWithTs = newProps)
- indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability))
- }
+ /** Fetch **/
- //TODO: ServiceColumn do not have durability property yet.
- def buildDeleteBelongsToId(vertex: S2Vertex): Seq[SKeyValue] = {
- val kvs = vertexSerializer(vertex).toKeyValues
- val kv = kvs.head
- vertex.belongLabelIds.map { id =>
- kv.copy(qualifier = Bytes.toBytes(S2Vertex.toPropKey(id)), operation = SKeyValue.Delete)
- }
- }
+ def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q = fetcher.buildRequest(queryRequest, edge)
- def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] = {
- val storeVertex = edge.innerLabel.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false)
+ def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex): Q = fetcher.buildRequest(queryRequest, vertex)
- if (storeVertex) {
- if (edge.op == GraphUtil.operations("delete"))
- buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex)
- else
- vertexSerializer(edge.srcForVertex).toKeyValues ++ vertexSerializer(edge.tgtForVertex).toKeyValues
- } else {
- Seq.empty
- }
- }
+ def fetches(queryRequests: Seq[QueryRequest],
+ prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] =
+ fetcher.fetches(queryRequests, prevStepEdges)
- def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] = {
- edge.propertyInner(LabelMeta.degree.name, degreeVal, edge.ts)
- val kvs = edge.edgesWithIndexValid.flatMap { indexEdge =>
- indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability))
- }
+ def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = fetcher.fetchKeyValues(rpc)
- kvs
- }
+ def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] = fetcher.fetchEdgesAll()
- def buildPutsAll(vertex: S2Vertex): Seq[SKeyValue] = {
- vertex.op match {
- case d: Byte if d == GraphUtil.operations("delete") => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete))
- case _ => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Put))
- }
- }
+ def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] = fetcher.fetchVerticesAll()
- def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName)
+ def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] =
+ fetcher.fetchSnapshotEdgeInner(edge)
+
+ /** Conflict Resolver **/
+ def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] =
+ conflictResolver.retry(tryNum)(edges, statusCode, fetchedSnapshotEdgeOpt)
+
+ /** Management **/
+ def flush(): Unit = management.flush()
+ def createTable(config: Config, tableNameStr: String): Unit = management.createTable(config, tableNameStr)
+
+ def truncateTable(config: Config, tableNameStr: String): Unit = management.truncateTable(config, tableNameStr)
+
+ def deleteTable(config: Config, tableNameStr: String): Unit = management.deleteTable(config, tableNameStr)
+
+ def shutdown(): Unit = management.shutdown()
+
+
+ def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
deleted file mode 100644
index 811cf62..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage
-
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.QueryParam
-import org.apache.s2graph.core.mysqls.{LabelMeta, Label}
-import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, InnerValLikeWithTs}
-import org.apache.s2graph.core.utils.logger
-
-object StorageDeserializable {
- /** Deserializer */
- def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): (Byte, Boolean) = {
- val byte = bytes(offset)
- val isInverted = if ((byte & 1) != 0) true else false
- val labelOrderSeq = byte >> 1
- (labelOrderSeq.toByte, isInverted)
- }
-
- def bytesToKeyValues(bytes: Array[Byte],
- offset: Int,
- length: Int,
- schemaVer: String,
- label: Label): (Array[(LabelMeta, InnerValLike)], Int) = {
- var pos = offset
- val len = bytes(pos)
- pos += 1
- val kvs = new Array[(LabelMeta, InnerValLike)](len)
- var i = 0
- while (i < len) {
- val k = label.labelMetaMap(bytes(pos))
- pos += 1
- val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer)
- pos += numOfBytesUsed
- kvs(i) = (k -> v)
- i += 1
- }
- val ret = (kvs, pos)
- // logger.debug(s"bytesToProps: $ret")
- ret
- }
-
- def bytesToKeyValuesWithTs(bytes: Array[Byte],
- offset: Int,
- schemaVer: String,
- label: Label): (Array[(LabelMeta, InnerValLikeWithTs)], Int) = {
- var pos = offset
- val len = bytes(pos)
- pos += 1
- val kvs = new Array[(LabelMeta, InnerValLikeWithTs)](len)
- var i = 0
- while (i < len) {
- val k = label.labelMetaMap(bytes(pos))
- pos += 1
- val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, schemaVer)
- pos += numOfBytesUsed
- kvs(i) = (k -> v)
- i += 1
- }
- val ret = (kvs, pos)
- // logger.debug(s"bytesToProps: $ret")
- ret
- }
-
- def bytesToProps(bytes: Array[Byte],
- offset: Int,
- schemaVer: String): (Array[(LabelMeta, InnerValLike)], Int) = {
- var pos = offset
- val len = bytes(pos)
- pos += 1
- val kvs = new Array[(LabelMeta, InnerValLike)](len)
- var i = 0
- while (i < len) {
- val k = LabelMeta.empty
- val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer)
- pos += numOfBytesUsed
- kvs(i) = (k -> v)
- i += 1
- }
- // logger.error(s"bytesToProps: $kvs")
- val ret = (kvs, pos)
-
- ret
- }
-
- def bytesToLong(bytes: Array[Byte], offset: Int): Long = Bytes.toLong(bytes, offset)
-
- def bytesToInt(bytes: Array[Byte], offset: Int): Int = Bytes.toInt(bytes, offset)
-}
-
-trait StorageDeserializable[E] {
- def fromKeyValues[T: CanSKeyValue](kvs: Seq[T], cacheElementOpt: Option[E]): Option[E]
-// = {
-// try {
-// Option(fromKeyValuesInner(kvs, cacheElementOpt))
-// } catch {
-// case e: Exception =>
-// logger.error(s"${this.getClass.getName} fromKeyValues failed.", e)
-// None
-// }
-// }
-// def fromKeyValuesInner[T: CanSKeyValue](kvs: Seq[T], cacheElementOpt: Option[E]): E
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
new file mode 100644
index 0000000..2e11f0b
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -0,0 +1,241 @@
+package org.apache.s2graph.core.storage
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.S2Graph.{convertEdges, normalize, processTimeDecay, sample}
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.parsers.WhereParser
+import org.apache.s2graph.core.utils.logger
+
+class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
+ val dummyCursor: Array[Byte] = Array.empty
+
+ /** Parsing Logic: parse from kv from Storage into Edge */
+ def toEdge[K: CanSKeyValue](kv: K,
+ queryRequest: QueryRequest,
+ cacheElementOpt: Option[S2Edge],
+ parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = {
+ logger.debug(s"toEdge: $kv")
+
+ try {
+ val queryOption = queryRequest.query.queryOption
+ val queryParam = queryRequest.queryParam
+ val schemaVer = queryParam.label.schemaVersion
+ val indexEdgeOpt = serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt)
+ if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.copy(parentEdges = parentEdges))
+ else indexEdgeOpt
+ } catch {
+ case ex: Exception =>
+ logger.error(s"Fail on toEdge: ${kv.toString}, ${queryRequest}", ex)
+ None
+ }
+ }
+
+ def toSnapshotEdge[K: CanSKeyValue](kv: K,
+ queryRequest: QueryRequest,
+ cacheElementOpt: Option[SnapshotEdge] = None,
+ isInnerCall: Boolean,
+ parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = {
+ // logger.debug(s"SnapshottoEdge: $kv")
+ val queryParam = queryRequest.queryParam
+ val schemaVer = queryParam.label.schemaVersion
+ val snapshotEdgeOpt = serDe.snapshotEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt)
+
+ if (isInnerCall) {
+ snapshotEdgeOpt.flatMap { snapshotEdge =>
+ val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges)
+ if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge)
+ else None
+ }
+ } else {
+ snapshotEdgeOpt.flatMap { snapshotEdge =>
+ if (snapshotEdge.allPropsDeleted) None
+ else {
+ val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges)
+ if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge)
+ else None
+ }
+ }
+ }
+ }
+
+ def toEdges[K: CanSKeyValue](kvs: Seq[K],
+ queryRequest: QueryRequest,
+ prevScore: Double = 1.0,
+ isInnerCall: Boolean,
+ parentEdges: Seq[EdgeWithScore],
+ startOffset: Int = 0,
+ len: Int = Int.MaxValue): StepResult = {
+
+ val toSKeyValue = implicitly[CanSKeyValue[K]].toSKeyValue _
+
+ if (kvs.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor))
+ else {
+ val queryOption = queryRequest.query.queryOption
+ val queryParam = queryRequest.queryParam
+ val labelWeight = queryRequest.labelWeight
+ val nextStepOpt = queryRequest.nextStepOpt
+ val where = queryParam.where.get
+ val label = queryParam.label
+ val isDefaultTransformer = queryParam.edgeTransformer.isDefault
+ val first = kvs.head
+ val kv = first
+ val schemaVer = queryParam.label.schemaVersion
+ val cacheElementOpt =
+ if (queryParam.isSnapshotEdge) None
+ else serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), None)
+
+ val (degreeEdges, keyValues) = cacheElementOpt match {
+ case None => (Nil, kvs)
+ case Some(cacheElement) =>
+ val head = cacheElement
+ if (!head.isDegree) (Nil, kvs)
+ else (Seq(EdgeWithScore(head, 1.0, label)), kvs.tail)
+ }
+
+ val lastCursor: Seq[Array[Byte]] = Seq(if (keyValues.nonEmpty) toSKeyValue(keyValues(keyValues.length - 1)).row else dummyCursor)
+
+ if (!queryOption.ignorePrevStepCache) {
+ val edgeWithScores = for {
+ (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
+ edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
+ if where == WhereParser.success || where.filter(edge)
+ convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
+ } yield {
+ val score = edge.rank(queryParam.rank)
+ EdgeWithScore(convertedEdge, score, label)
+ }
+ StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor)
+ } else {
+ val degreeScore = 0.0
+
+ val edgeWithScores = for {
+ (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
+ edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
+ if where == WhereParser.success || where.filter(edge)
+ convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt)
+ } yield {
+ val edgeScore = edge.rank(queryParam.rank)
+ val score = queryParam.scorePropagateOp match {
+ case "plus" => edgeScore + prevScore
+ case "divide" =>
+ if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
+ else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
+ case _ => edgeScore * prevScore
+ }
+ val tsVal = processTimeDecay(queryParam, edge)
+ val newScore = degreeScore + score
+ EdgeWithScore(convertedEdge.copy(parentEdges = parentEdges), score = newScore * labelWeight * tsVal, label = label)
+ }
+
+ val sampled =
+ if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+ else edgeWithScores
+
+ val normalized = if (queryParam.shouldNormalize) normalize(sampled) else sampled
+
+ StepResult(edgeWithScores = normalized, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor)
+ }
+ }
+ }
+
+ /** End Of Parse Logic */
+
+
+ /** end of query */
+
+ /** Mutation Builder */
+
+
+ /** EdgeMutate */
+ def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = {
+ // skip sampling for delete operation
+ val deleteMutations = edgeMutate.edgesToDeleteWithIndexOpt.flatMap { indexEdge =>
+ serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete, durability = indexEdge.label.durability))
+ }
+
+ val insertMutations = edgeMutate.edgesToInsertWithIndexOpt.flatMap { indexEdge =>
+ serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability))
+ }
+
+ deleteMutations ++ insertMutations
+ }
+
+ def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
+ edgeMutate.newSnapshotEdge.map(e => serDe.snapshotEdgeSerializer(e).toKeyValues.map(_.copy(durability = e.label.durability))).getOrElse(Nil)
+
+ def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = {
+ (edgeMutate.edgesToDeleteWithIndexOptForDegree.isEmpty, edgeMutate.edgesToInsertWithIndexOptForDegree.isEmpty) match {
+ case (true, true) =>
+ /* when there is no need to update. shouldUpdate == false */
+ Nil -> Nil
+
+ case (true, false) =>
+ /* no edges to delete but there is new edges to insert so increase degree by 1 */
+ val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToInsertWithIndexOptForDegree)
+ buffer.flatMap(buildIncrementsAsync(_)) -> nonBuffer.flatMap(buildIncrementsAsync(_))
+
+ case (false, true) =>
+ /* no edges to insert but there is old edges to delete so decrease degree by 1 */
+ val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToDeleteWithIndexOptForDegree)
+ buffer.flatMap(buildIncrementsAsync(_, -1)) -> nonBuffer.flatMap(buildIncrementsAsync(_, -1))
+
+ case (false, false) =>
+ /* update on existing edges so no change on degree */
+ Nil -> Nil
+ }
+ }
+
+ /** IndexEdge */
+ def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = {
+ val newProps = indexedEdge.updatePropsWithTs()
+ newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts))
+ val _indexedEdge = indexedEdge.copy(propsWithTs = newProps)
+ serDe.indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability))
+ }
+
+ def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = {
+ val newProps = indexedEdge.updatePropsWithTs()
+ newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts))
+ val _indexedEdge = indexedEdge.copy(propsWithTs = newProps)
+ serDe.indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability))
+ }
+
+ //TODO: ServiceColumn do not have durability property yet.
+ def buildDeleteBelongsToId(vertex: S2Vertex): Seq[SKeyValue] = {
+ val kvs = serDe.vertexSerializer(vertex).toKeyValues
+ val kv = kvs.head
+ vertex.belongLabelIds.map { id =>
+ kv.copy(qualifier = Bytes.toBytes(S2Vertex.toPropKey(id)), operation = SKeyValue.Delete)
+ }
+ }
+
+ def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] = {
+ val storeVertex = edge.innerLabel.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false)
+
+ if (storeVertex) {
+ if (edge.op == GraphUtil.operations("delete"))
+ buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex)
+ else
+ serDe.vertexSerializer(edge.srcForVertex).toKeyValues ++ serDe.vertexSerializer(edge.tgtForVertex).toKeyValues
+ } else {
+ Seq.empty
+ }
+ }
+
+ def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] = {
+ edge.propertyInner(LabelMeta.degree.name, degreeVal, edge.ts)
+ val kvs = edge.edgesWithIndexValid.flatMap { indexEdge =>
+ serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability))
+ }
+
+ kvs
+ }
+
+ def buildPutsAll(vertex: S2Vertex): Seq[SKeyValue] = {
+ vertex.op match {
+ case d: Byte if d == GraphUtil.operations("delete") => serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete))
+ case _ => serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageManagement.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageManagement.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageManagement.scala
new file mode 100644
index 0000000..da94767
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageManagement.scala
@@ -0,0 +1,35 @@
+package org.apache.s2graph.core.storage
+
+import com.typesafe.config.Config
+
+trait StorageManagement {
+ /**
+ * this method need to be called when client shutdown. this is responsible to cleanUp the resources
+ * such as client into storage.
+ */
+ def flush(): Unit
+
+ /**
+ * create table on storage.
+ * if storage implementation does not support namespace or table, then there is nothing to be done
+ * @param config
+ */
+ def createTable(config: Config, tableNameStr: String): Unit
+ /**
+ *
+ * @param config
+ * @param tableNameStr
+ */
+ def truncateTable(config: Config, tableNameStr: String): Unit
+ /**
+ *
+ * @param config
+ * @param tableNameStr
+ */
+ def deleteTable(config: Config, tableNameStr: String): Unit
+
+ /**
+ *
+ */
+ def shutdown(): Unit
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
new file mode 100644
index 0000000..96669ca
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
@@ -0,0 +1,62 @@
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.types.VertexId
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait StorageReadable[Q] {
+ val io: StorageIO
+ /**
+ * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues.
+ * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build
+ * client request(GetRequest, Scanner) based on user provided query.
+ *
+ * @param queryRequest
+ * @return
+ */
+ def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q
+
+ def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex): Q
+
+ /**
+ * responsible to fire parallel fetch call into storage and create future that will return merged result.
+ *
+ * @param queryRequests
+ * @param prevStepEdges
+ * @return
+ */
+ def fetches(queryRequests: Seq[QueryRequest],
+ prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]]
+
+ def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+
+ def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]]
+
+ def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2Vertex]]
+
+ def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = {
+ val queryParam = QueryParam(labelName = edge.innerLabel.label,
+ direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
+ tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
+ cacheTTLInMillis = -1)
+ val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))
+ val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
+
+ fetchKeyValues(buildRequest(queryRequest, edge)).map { kvs =>
+ val (edgeOpt, kvOpt) =
+ if (kvs.isEmpty) (None, None)
+ else {
+ val snapshotEdgeOpt = io.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
+ val _kvOpt = kvs.headOption
+ (snapshotEdgeOpt, _kvOpt)
+ }
+ (queryParam, edgeOpt, kvOpt)
+ } recoverWith { case ex: Throwable =>
+ logger.error(s"fetchQueryParam failed. fallback return.", ex)
+ throw new FetchTimeoutException(s"${edge.toLogString}")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala
new file mode 100644
index 0000000..f973e0f
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala
@@ -0,0 +1,59 @@
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core.{IndexEdge, S2Graph, S2Vertex, SnapshotEdge}
+import org.apache.s2graph.core.storage.serde.Deserializable
+import org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializable
+
+trait StorageSerDe {
+ /**
+ * Compatibility table
+ * | label schema version | snapshot edge | index edge | vertex | note |
+ * | v1 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue |
+ * | v2 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue |
+ * | v3 | serde.snapshotedge.tall | serde.indexedge.wide | serde.vertex | recommended with HBase. current stable schema |
+ * | v4 | serde.snapshotedge.tall | serde.indexedge.tall | serde.vertex | experimental schema. use scanner instead of get |
+ *
+ */
+
+ /**
+ * create serializer that knows how to convert given snapshotEdge into kvs: Seq[SKeyValue]
+ * so we can store this kvs.
+ *
+ * @param snapshotEdge : snapshotEdge to serialize
+ * @return serializer implementation for StorageSerializable which has toKeyValues return Seq[SKeyValue]
+ */
+ def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): serde.Serializable[SnapshotEdge]
+
+ /**
+ * create serializer that knows how to convert given indexEdge into kvs: Seq[SKeyValue]
+ *
+ * @param indexEdge : indexEdge to serialize
+ * @return serializer implementation
+ */
+ def indexEdgeSerializer(indexEdge: IndexEdge): serde.Serializable[IndexEdge]
+
+ /**
+ * create serializer that knows how to convert given vertex into kvs: Seq[SKeyValue]
+ *
+ * @param vertex : vertex to serialize
+ * @return serializer implementation
+ */
+ def vertexSerializer(vertex: S2Vertex): serde.Serializable[S2Vertex]
+
+ /**
+ * create deserializer that can parse stored CanSKeyValue into snapshotEdge.
+ * note that each storage implementation should implement implicit type class
+ * to convert storage dependent dataType into common SKeyValue type by implementing CanSKeyValue
+ *
+ * ex) Asynchbase use it's KeyValue class and CanSKeyValue object has implicit type conversion method.
+ * if any storaage use different class to represent stored byte array,
+ * then that storage implementation is responsible to provide implicit type conversion method on CanSKeyValue.
+ **/
+ def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge]
+
+ def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable
+
+ def vertexDeserializer(schemaVer: String): Deserializable[S2Vertex]
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
deleted file mode 100644
index c1efe7b..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage
-
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.LabelMeta
-import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs}
-import org.apache.s2graph.core.utils.logger
-
-object StorageSerializable {
- /** serializer */
- def propsToBytes(props: Seq[(LabelMeta, InnerValLike)]): Array[Byte] = {
- val len = props.length
- assert(len < Byte.MaxValue)
- var bytes = Array.fill(1)(len.toByte)
- for ((_, v) <- props) bytes = Bytes.add(bytes, v.bytes)
- bytes
- }
-
- def propsToKeyValues(props: Seq[(LabelMeta, InnerValLike)]): Array[Byte] = {
- val len = props.length
- assert(len < Byte.MaxValue)
- var bytes = Array.fill(1)(len.toByte)
- for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k.seq), v.bytes)
- bytes
- }
-
- def propsToKeyValuesWithTs(props: Seq[(LabelMeta, InnerValLikeWithTs)]): Array[Byte] = {
- val len = props.length
- assert(len < Byte.MaxValue)
- var bytes = Array.fill(1)(len.toByte)
- for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k.seq), v.bytes)
- bytes
- }
-
- def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): Array[Byte] = {
- assert(labelOrderSeq < (1 << 6))
- val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0)
- Array.fill(1)(byte.toByte)
- }
-
- def intToBytes(value: Int): Array[Byte] = Bytes.toBytes(value)
-
- def longToBytes(value: Long): Array[Byte] = Bytes.toBytes(value)
-}
-
-trait StorageSerializable[E] {
- val cf = Serializable.edgeCf
-
- def table: Array[Byte]
- def ts: Long
-
- def toRowKey: Array[Byte]
- def toQualifier: Array[Byte]
- def toValue: Array[Byte]
-
- def toKeyValues: Seq[SKeyValue] = {
- val row = toRowKey
- val qualifier = toQualifier
- val value = toValue
- val kv = SKeyValue(table, row, cf, qualifier, value, ts)
-// logger.debug(s"[SER]: ${kv.toLogString}}")
- Seq(kv)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
new file mode 100644
index 0000000..216aece
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
@@ -0,0 +1,45 @@
+package org.apache.s2graph.core.storage
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait StorageWritable {
+ /**
+ * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
+ * note that this should be return true on all success.
+ * we assumes that each storage implementation has client as member variable.
+ *
+ *
+ * @param cluster: where this key values should be stored.
+ * @param kvs: sequence of SKeyValue that need to be stored in storage.
+ * @param withWait: flag to control wait ack from storage.
+ * note that in AsynchbaseStorage(which support asynchronous operations), even with true,
+ * it never block thread, but rather submit work and notified by event loop when storage send ack back.
+ * @return ack message from storage.
+ */
+ def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
+
+ /**
+ * write requestKeyValue into storage if the current value in storage that is stored matches.
+ * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
+ *
+ * Most important thing is this have to be 'atomic' operation.
+ * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
+ * either blocked or failed on write-write conflict case.
+ *
+ * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
+ * prevent wrong data for read.
+ *
+ * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
+ * compareAndSet to synchronize.
+ *
+ * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
+ * for storage that does not support concurrency control, then storage implementation
+ * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
+ * and write(writeLock).
+ * @param requestKeyValue
+ * @param expectedOpt
+ * @return
+ */
+ def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse]
+
+}