You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/06/11 14:15:50 UTC

incubator-s2graph git commit: [S2GRAPH-66]: Optimize toEdge, IndexEdgeDeserializable using mutable Map. use immutable.Map.newBuilder to build merged props.

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master fabb212dd -> 4e40ec7e6


[S2GRAPH-66]: Optimize toEdge, IndexEdgeDeserializable using mutable Map.
  use immutable.Map.newBuilder to build merged props.

JIRA:
  [S2GRAPH-66] https://issues.apache.org/jira/browse/S2GRAPH-66

Pull Request:
  Closes #50

Authors:
  DOYUNG YOON: steamshon@apache.org


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/4e40ec7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/4e40ec7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/4e40ec7e

Branch: refs/heads/master
Commit: 4e40ec7e6a41acd8e46c0815bdf1fe35098dd360
Parents: fabb212
Author: DO YUNG YOON <st...@apache.org>
Authored: Sat Jun 11 23:16:31 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Sat Jun 11 23:16:31 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../tall/IndexEdgeDeserializable.scala          | 43 ++++++++++-------
 .../wide/IndexEdgeDeserializable.scala          | 51 +++++++++++---------
 3 files changed, 54 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e40ec7e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a7f7512..8ee4761 100644
--- a/CHANGES
+++ b/CHANGES
@@ -65,6 +65,8 @@ Release 0.12.1 - unreleased
 		(Committed by DOYUNG YOON).
 
     S2GRAPH-55: Add param to enable epoll event loop in experimental netty http server (Committed by daewon).
+    
+    S2GRAPH-66: Optimize toEdge, IndexEdgeDeserializable using mutable Map. (Committed by DOYUN GYOON).
 
     S2GRAPH-22: Add missing shebang line to file. 
 		(Contributed by Injun Song<ij...@gmail.com>, committed by DOYUNG YOON)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e40ec7e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index f233940..da029ef 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -26,6 +26,8 @@ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue,
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
 
+import scala.collection.immutable
+
 class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
    import StorageDeserializable._
 
@@ -107,7 +109,6 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
        IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props)
      } else {
        // not degree edge
-       val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
 
        val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version)
        pos = endAt
@@ -117,32 +118,38 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
          TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version)
        }
 
-       val idxProps = for {
-         (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
-       } yield if (k == LabelMeta.degreeSeq) k -> v else seq -> v
-
-       val idxPropsMap = idxProps.toMap
-
-       val tgtVertexId =
-         idxPropsMap.get(LabelMeta.toSeq) match {
-           case None => tgtVertexIdRaw
-           case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
-         }
+       val allProps = immutable.Map.newBuilder[Byte, InnerValLike]
+       /** process index props */
+       val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
 
-       val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
+       for {
+         (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+       } {
+         if (k == LabelMeta.degreeSeq) allProps += k -> v
+         else allProps += seq -> v
+       }
+       /** process props */
+       if (op == GraphUtil.operations("incrementCount")) {
          //        val countVal = Bytes.toLong(kv.value)
          val countVal = bytesToLongFunc(kv.value, 0)
-         val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
-         (dummyProps, 8)
+         allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
        } else {
-         bytesToKeyValues(kv.value, 0, kv.value.length, version)
+         val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
+         props.foreach { case (k, v) =>
+           allProps += (k -> v)
+         }
        }
-
-       val _mergedProps = (idxProps ++ props).toMap
+       val _mergedProps = allProps.result()
        val mergedProps =
          if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
          else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version))
 
+       val tgtVertexId =
+         mergedProps.get(LabelMeta.toSeq) match {
+           case None => tgtVertexIdRaw
+           case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
+         }
+
        val ts = kv.timestamp
        IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e40ec7e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index d8bef97..20770c0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -25,6 +25,8 @@ import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
 
+import scala.collection.immutable
+
 class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
    import StorageDeserializable._
 
@@ -89,42 +91,43 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
        if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version)
        else parseQualifier(kv, version)
 
-     val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
+     val allProps = immutable.Map.newBuilder[Byte, InnerValLike]
+
+     /** process index props */
+     val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
+     for {
+       (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+     } {
+       if (k == LabelMeta.degreeSeq) allProps += k -> v
+       else allProps += seq -> v
+     }
+
+     /** process props */
+     if (op == GraphUtil.operations("incrementCount")) {
        //      val countVal = Bytes.toLong(kv.value)
        val countVal = bytesToLongFunc(kv.value, 0)
-       val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
-       (dummyProps, 8)
+       allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
      } else if (kv.qualifier.isEmpty) {
-       parseDegreeValue(kv, version)
+       val countVal = bytesToLongFunc(kv.value, 0)
+       allProps += (LabelMeta.degreeSeq -> InnerVal.withLong(countVal, version))
      } else {
-       parseValue(kv, version)
+       val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
+       props.foreach { case (k, v) =>
+         allProps += (k -> v)
+       }
      }
 
-     val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
-
-
-     //    assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size)
-
-     val idxProps = for {
-       (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
-     } yield {
-         if (k == LabelMeta.degreeSeq) k -> v
-         else seq -> v
-       }
+     val _mergedProps = allProps.result()
+     val mergedProps =
+       if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
+       else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version))
 
-     val idxPropsMap = idxProps.toMap
      val tgtVertexId = if (tgtVertexIdInQualifier) {
-       idxPropsMap.get(LabelMeta.toSeq) match {
+       mergedProps.get(LabelMeta.toSeq) match {
          case None => tgtVertexIdRaw
          case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
        }
      } else tgtVertexIdRaw
-
-     val _mergedProps = (idxProps ++ props).toMap
-     val mergedProps =
-       if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
-       else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version))
-
      //    logger.error(s"$mergedProps")
      //    val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong