You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/06/11 14:15:50 UTC
incubator-s2graph git commit: [S2GRAPH-66]: Optimize toEdge,
IndexEdgeDeserializable using mutable Map. use
immutable.Map.newBuilder to build merged props.
Repository: incubator-s2graph
Updated Branches:
refs/heads/master fabb212dd -> 4e40ec7e6
[S2GRAPH-66]: Optimize toEdge, IndexEdgeDeserializable using mutable Map.
use immutable.Map.newBuilder to build merged props.
JIRA:
[S2GRAPH-66] https://issues.apache.org/jira/browse/S2GRAPH-66
Pull Request:
Closes #50
Authors:
DOYUNG YOON: steamshon@apache.org
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/4e40ec7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/4e40ec7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/4e40ec7e
Branch: refs/heads/master
Commit: 4e40ec7e6a41acd8e46c0815bdf1fe35098dd360
Parents: fabb212
Author: DO YUNG YOON <st...@apache.org>
Authored: Sat Jun 11 23:16:31 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Sat Jun 11 23:16:31 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../tall/IndexEdgeDeserializable.scala | 43 ++++++++++-------
.../wide/IndexEdgeDeserializable.scala | 51 +++++++++++---------
3 files changed, 54 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e40ec7e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a7f7512..8ee4761 100644
--- a/CHANGES
+++ b/CHANGES
@@ -65,6 +65,8 @@ Release 0.12.1 - unreleased
(Committed by DOYUNG YOON).
S2GRAPH-55: Add param to enable epoll event loop in experimental netty http server (Committed by daewon).
+
+ S2GRAPH-66: Optimize toEdge, IndexEdgeDeserializable using mutable Map. (Committed by DOYUN GYOON).
S2GRAPH-22: Add missing shebang line to file.
(Contributed by Injun Song<ij...@gmail.com>, committed by DOYUNG YOON)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e40ec7e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index f233940..da029ef 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -26,6 +26,8 @@ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue,
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
+import scala.collection.immutable
+
class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
import StorageDeserializable._
@@ -107,7 +109,6 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props)
} else {
// not degree edge
- val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version)
pos = endAt
@@ -117,32 +118,38 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version)
}
- val idxProps = for {
- (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
- } yield if (k == LabelMeta.degreeSeq) k -> v else seq -> v
-
- val idxPropsMap = idxProps.toMap
-
- val tgtVertexId =
- idxPropsMap.get(LabelMeta.toSeq) match {
- case None => tgtVertexIdRaw
- case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
- }
+ val allProps = immutable.Map.newBuilder[Byte, InnerValLike]
+ /** process index props */
+ val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
- val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
+ for {
+ (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+ } {
+ if (k == LabelMeta.degreeSeq) allProps += k -> v
+ else allProps += seq -> v
+ }
+ /** process props */
+ if (op == GraphUtil.operations("incrementCount")) {
// val countVal = Bytes.toLong(kv.value)
val countVal = bytesToLongFunc(kv.value, 0)
- val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
- (dummyProps, 8)
+ allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
} else {
- bytesToKeyValues(kv.value, 0, kv.value.length, version)
+ val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
+ props.foreach { case (k, v) =>
+ allProps += (k -> v)
+ }
}
-
- val _mergedProps = (idxProps ++ props).toMap
+ val _mergedProps = allProps.result()
val mergedProps =
if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version))
+ val tgtVertexId =
+ mergedProps.get(LabelMeta.toSeq) match {
+ case None => tgtVertexIdRaw
+ case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
+ }
+
val ts = kv.timestamp
IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e40ec7e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index d8bef97..20770c0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -25,6 +25,8 @@ import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
+import scala.collection.immutable
+
class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
import StorageDeserializable._
@@ -89,42 +91,43 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version)
else parseQualifier(kv, version)
- val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
+ val allProps = immutable.Map.newBuilder[Byte, InnerValLike]
+
+ /** process index props */
+ val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
+ for {
+ (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+ } {
+ if (k == LabelMeta.degreeSeq) allProps += k -> v
+ else allProps += seq -> v
+ }
+
+ /** process props */
+ if (op == GraphUtil.operations("incrementCount")) {
// val countVal = Bytes.toLong(kv.value)
val countVal = bytesToLongFunc(kv.value, 0)
- val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
- (dummyProps, 8)
+ allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version))
} else if (kv.qualifier.isEmpty) {
- parseDegreeValue(kv, version)
+ val countVal = bytesToLongFunc(kv.value, 0)
+ allProps += (LabelMeta.degreeSeq -> InnerVal.withLong(countVal, version))
} else {
- parseValue(kv, version)
+ val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
+ props.foreach { case (k, v) =>
+ allProps += (k -> v)
+ }
}
- val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
-
-
- // assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size)
-
- val idxProps = for {
- (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
- } yield {
- if (k == LabelMeta.degreeSeq) k -> v
- else seq -> v
- }
+ val _mergedProps = allProps.result()
+ val mergedProps =
+ if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
+ else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version))
- val idxPropsMap = idxProps.toMap
val tgtVertexId = if (tgtVertexIdInQualifier) {
- idxPropsMap.get(LabelMeta.toSeq) match {
+ mergedProps.get(LabelMeta.toSeq) match {
case None => tgtVertexIdRaw
case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
}
} else tgtVertexIdRaw
-
- val _mergedProps = (idxProps ++ props).toMap
- val mergedProps =
- if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
- else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version))
-
// logger.error(s"$mergedProps")
// val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong