You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/07/01 15:27:59 UTC
[08/46] incubator-s2graph git commit: bug fix on Edge.vertices.
bug fix on Edge.vertices.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/350e2e6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/350e2e6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/350e2e6a
Branch: refs/heads/master
Commit: 350e2e6a2ade36ee11a29a0bbb1bacf2b2d7e345
Parents: 4425859
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Apr 10 23:04:10 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Apr 10 23:05:32 2017 +0900
----------------------------------------------------------------------
.../org/apache/s2graph/core/QueryResult.scala | 44 +++-
.../scala/org/apache/s2graph/core/S2Edge.scala | 31 ++-
.../scala/org/apache/s2graph/core/S2Graph.scala | 43 ++--
.../org/apache/s2graph/core/S2Vertex.scala | 3 +-
.../s2graph/core/mysqls/ServiceColumn.scala | 3 +-
.../apache/s2graph/core/storage/Storage.scala | 3 +-
.../core/storage/hbase/AsynchbaseStorage.scala | 31 ++-
.../serde/vertex/VertexDeserializable.scala | 5 +-
.../serde/vertex/VertexSerializable.scala | 4 +-
.../s2graph/core/utils/SafeUpdateCache.scala | 4 +-
.../core/tinkerpop/S2GraphProvider.scala | 19 +-
.../core/tinkerpop/structure/S2GraphTest.scala | 254 ++++++++++++-------
12 files changed, 281 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index bad8361..a7f485c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -27,24 +27,42 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Seq, mutable}
object QueryResult {
+ def fromVertices(graph: S2Graph, vertices: Seq[S2Vertex], queryParams: Seq[QueryParam]): StepResult = {
+ val edgeWithScores = vertices.flatMap { vertex =>
+ queryParams.map { queryParam =>
+ val label = queryParam.label
+ val currentTs = System.currentTimeMillis()
+ val propsWithTs = Map(LabelMeta.timestamp ->
+ InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs))
+
+ val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs)
+ val edgeWithScore = EdgeWithScore(edge, S2Graph.DefaultScore, queryParam.label)
+ edgeWithScore
+
+ }
+ }
+ StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil, false)
+ }
+
def fromVertices(graph: S2Graph,
query: Query): StepResult = {
if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) {
StepResult.Empty
} else {
- val queryParam = query.steps.head.queryParams.head
- val label = queryParam.label
- val currentTs = System.currentTimeMillis()
- val propsWithTs = Map(LabelMeta.timestamp ->
- InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs))
- val edgeWithScores = for {
- vertex <- query.vertices
- } yield {
- val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs)
- val edgeWithScore = EdgeWithScore(edge, S2Graph.DefaultScore, queryParam.label)
- edgeWithScore
- }
- StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil, false)
+ fromVertices(graph, query.vertices, query.steps.head.queryParams)
+// val queryParam = query.steps.head.queryParams.head
+// val label = queryParam.label
+// val currentTs = System.currentTimeMillis()
+// val propsWithTs = Map(LabelMeta.timestamp ->
+// InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs))
+// val edgeWithScores = for {
+// vertex <- query.vertices
+// } yield {
+// val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs)
+// val edgeWithScore = EdgeWithScore(edge, S2Graph.DefaultScore, queryParam.label)
+// edgeWithScore
+// }
+// StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil, false)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 162ada5..6321dd5 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -596,18 +596,26 @@ case class S2Edge(innerGraph: S2Graph,
override def vertices(direction: Direction): util.Iterator[structure.Vertex] = {
val arr = new util.ArrayList[Vertex]()
+
direction match {
case Direction.OUT =>
- val newVertexId = VertexId(ServiceColumn.findById(srcForVertex.id.colId), srcForVertex.innerId)
- arr.add(srcVertex.copy(id = newVertexId))
-// arr.add(srcVertex)
+ val newVertexId = this.direction match {
+ case "out" => VertexId(innerLabel.srcColumn, srcVertex.innerId)
+ case "in" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId)
+ case _ => throw new IllegalArgumentException("direction can only be out/in.")
+ }
+ innerGraph.getVertex(newVertexId).foreach(arr.add)
case Direction.IN =>
- val newVertexId = VertexId(ServiceColumn.findById(tgtForVertex.id.colId), tgtForVertex.innerId)
- arr.add(tgtVertex.copy(id = newVertexId))
-// arr.add(tgtVertex)
+ val newVertexId = this.direction match {
+ case "in" => VertexId(innerLabel.srcColumn, srcVertex.innerId)
+ case "out" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId)
+ case _ => throw new IllegalArgumentException("direction can only be out/in.")
+ }
+ innerGraph.getVertex(newVertexId).foreach(arr.add)
case _ =>
- arr.add(srcVertex)
- arr.add(tgtVertex)
+ import scala.collection.JavaConversions._
+ vertices(Direction.OUT).foreach(arr.add)
+ vertices(Direction.IN).foreach(arr.add)
}
arr.iterator()
}
@@ -674,11 +682,8 @@ case class S2Edge(innerGraph: S2Graph,
override def id(): AnyRef = {
// NOTE: xxxForVertex makes direction to be "out"
- if (this.innerLabel.consistencyLevel == "strong") {
- EdgeId(srcForVertex.innerId, tgtForVertex.innerId, label(), direction, 0)
- } else {
- EdgeId(srcForVertex.innerId, tgtForVertex.innerId, label(), direction, ts)
- }
+ val timestamp = if (this.innerLabel.consistencyLevel == "string") 0l else ts
+ EdgeId(srcVertex.innerId, tgtVertex.innerId, label(), direction, timestamp)
}
override def label(): String = innerLabel.label
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 2771415..7e037b8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -70,7 +70,7 @@ object S2Graph {
"cache.ttl.seconds" -> java.lang.Integer.valueOf(60),
"hbase.client.retries.number" -> java.lang.Integer.valueOf(20),
"hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort),
- "hbase.rpc.timeout" -> java.lang.Integer.valueOf(1000),
+ "hbase.rpc.timeout" -> java.lang.Integer.valueOf(60000),
"max.retry.number" -> java.lang.Integer.valueOf(100),
"lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10),
"max.back.off" -> java.lang.Integer.valueOf(100),
@@ -530,8 +530,9 @@ object S2Graph {
@Graph.OptIn(Graph.OptIn.SUITE_STRUCTURE_STANDARD)
@Graph.OptOuts(value = Array(
- // new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest", method="*", reason="no"), // pass
- // new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphConstructionTest", method="*", reason="no"), // pass
+// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest", method="*", reason="no"), // pass
+// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphConstructionTest", method="*", reason="no"), // pass
+
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.FeatureSupportTest", method="*", reason="no"), // pass
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.PropertyTest", method="*", reason="no"), // pass
@@ -1370,22 +1371,22 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
S2Edge.fillPropsWithTs(snapshotEdge, propsWithTs)
snapshotEdge
}
-
- /**
- * internal helper to actually store a single edge based on given peramters.
- *
- * Note that this is used from S2Vertex to implement blocking interface from Tp3.
- * Once tp3 provide AsyncStep, then this can be changed to return Java's CompletableFuture.
- *
- * @param srcVertex
- * @param tgtVertex
- * @param labelName
- * @param direction
- * @param props
- * @param ts
- * @param operation
- * @return
- */
+//
+// /**
+// * internal helper to actually store a single edge based on given peramters.
+// *
+// * Note that this is used from S2Vertex to implement blocking interface from Tp3.
+// * Once tp3 provide AsyncStep, then this can be changed to return Java's CompletableFuture.
+// *
+// * @param srcVertex
+// * @param tgtVertex
+// * @param labelName
+// * @param direction
+// * @param props
+// * @param ts
+// * @param operation
+// * @return
+// */
// private[core] def addEdgeInner(srcVertex: S2Vertex,
// tgtVertex: S2Vertex,
// labelName: String,
@@ -1466,6 +1467,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
val queryParams = labelNameWithDirs.map { case (l, direction) =>
QueryParam(labelName = l, direction = direction)
}
+
val query = Query.toQuery(Seq(vertex), queryParams)
getEdges(query).map { stepResult =>
@@ -1529,8 +1531,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
override def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
if (edgeIds.isEmpty) {
// FIXME
- val edges = Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator
- edges.filterNot(_.isDegree).filterNot(_.direction == "in")
+ Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator
} else {
Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
index e13f581..b80a54c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
@@ -209,7 +209,8 @@ case class S2Vertex(graph: S2Graph,
val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
val edge = graph.newEdge(this, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs)
- val future = graph.mutateEdges(edge.relatedEdges, withWait = true)
+ // edge.relatedEdges
+ val future = graph.mutateEdges(Seq(edge), withWait = true)
Await.ready(future, graph.WaitTimeout)
edge
} catch {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
index 8614132..32ca653 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
@@ -25,7 +25,8 @@ package org.apache.s2graph.core.mysqls
import org.apache.s2graph.core.JSONParser
import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.types.{HBaseType, InnerValLikeWithTs, InnerValLike}
+import org.apache.s2graph.core.types.{HBaseType, InnerValLike, InnerValLikeWithTs}
+import org.apache.s2graph.core.utils.logger
import play.api.libs.json.Json
import scalikejdbc._
object ServiceColumn extends Model[ServiceColumn] {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index a9b523c..a8dec7e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -285,6 +285,7 @@ abstract class Storage[Q, R](val graph: S2Graph,
val queryParam = QueryParam.Empty
val q = Query.toQuery(Seq(vertex), Seq(queryParam))
val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
+
fetchVertexKeyValues(queryRequest).map { kvs =>
fromResult(kvs, vertex.serviceColumn.schemaVersion)
} recoverWith { case ex: Throwable =>
@@ -321,7 +322,7 @@ abstract class Storage[Q, R](val graph: S2Graph,
mutateRet <- Future.sequence(mutateEdgeFutures)
} yield mutateRet
- composed.map(_.forall(identity)).map { ret => idxs.map(idx => idx -> ret) }
+ composed.map(_.forall(identity)).map { ret => idxs.map( idx => idx -> ret) }
}
Future.sequence(mutateEdges).map { squashedRets =>
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index dab5aae..e41fe27 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -23,7 +23,7 @@ package org.apache.s2graph.core.storage.hbase
import java.util
import java.util.Base64
-import java.util.concurrent.{TimeUnit, ExecutorService, Executors}
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import com.stumbleupon.async.{Callback, Deferred}
import com.typesafe.config.Config
@@ -34,17 +34,17 @@ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
import org.apache.hadoop.hbase.regionserver.BloomType
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{TableName, HColumnDescriptor, HBaseConfiguration, HTableDescriptor}
+import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.security.UserGroupInformation
-
import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange}
-import org.apache.s2graph.core.types.{VertexId, HBaseType}
+import org.apache.s2graph.core.types.{HBaseType, VertexId}
import org.apache.s2graph.core.utils._
import org.hbase.async.FilterList.Operator.MUST_PASS_ALL
import org.hbase.async._
+
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
@@ -96,8 +96,8 @@ object AsynchbaseStorage {
def initLocalHBase(config: Config,
overwrite: Boolean = true): ExecutorService = {
- import java.net.Socket
import java.io.{File, IOException}
+ import java.net.Socket
lazy val hbaseExecutor = {
val executor = Executors.newSingleThreadExecutor()
@@ -277,6 +277,7 @@ class AsynchbaseStorage(override val graph: S2Graph,
override def fetchSnapshotEdgeKeyValues(queryRequest: QueryRequest): Future[Seq[SKeyValue]] = {
val edge = toRequestEdge(queryRequest, Nil)
val rpc = buildRequest(queryRequest, edge)
+
fetchKeyValues(rpc)
}
@@ -445,6 +446,7 @@ class AsynchbaseStorage(override val graph: S2Graph,
val edge = toRequestEdge(queryRequest, parentEdges)
val request = buildRequest(queryRequest, edge)
+
val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes)
@@ -655,10 +657,8 @@ class AsynchbaseStorage(override val graph: S2Graph,
override def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
def fromResult(kvs: Seq[SKeyValue],
version: String): Option[S2Vertex] = {
-
if (kvs.isEmpty) None
else vertexDeserializer.fromKeyValues(kvs, None)
-// .map(S2Vertex(graph, _))
}
val futures = vertices.map { vertex =>
@@ -677,20 +677,25 @@ class AsynchbaseStorage(override val graph: S2Graph,
Future.sequence(futures).map { result => result.toList.flatten }
}
+ //TODO: Limited to 100000 edges per hbase table. fix this later.
override def fetchEdgesAll(): Future[Seq[S2Edge]] = {
val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) =>
+ val distinctLabels = labels.toSet
val scan = AsynchbasePatcher.newScanner(client, hTableName)
scan.setFamily(Serializable.edgeCf)
scan.setMaxVersions(1)
- scan.nextRows(10000).toFuture(emptyKeyValuesLs).map {
+ scan.nextRows(100000).toFuture(emptyKeyValuesLs).map {
case null => Seq.empty
case kvsLs =>
- kvsLs.flatMap { kvs =>
- kvs.flatMap { kv =>
- indexEdgeDeserializer.fromKeyValues(Seq(kv), None)
+ kvsLs.flatMap { kvs =>
+ kvs.flatMap { kv =>
+ val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
+
+ indexEdgeDeserializer.fromKeyValues(Seq(kv), None)
+ .filter(e => distinctLabels(e.innerLabel) && e.direction == "out" && !e.isDegree)
+ }
}
- }
}
}
@@ -699,6 +704,7 @@ class AsynchbaseStorage(override val graph: S2Graph,
override def fetchVerticesAll(): Future[Seq[S2Vertex]] = {
val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) =>
+ val distinctColumns = columns.toSet
val scan = AsynchbasePatcher.newScanner(client, hTableName)
scan.setFamily(Serializable.vertexCf)
scan.setMaxVersions(1)
@@ -708,6 +714,7 @@ class AsynchbaseStorage(override val graph: S2Graph,
case kvsLs =>
kvsLs.flatMap { kvs =>
vertexDeserializer.fromKeyValues(kvs, None)
+ .filter(v => distinctColumns(v.serviceColumn))
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
index b4a00e6..f8921a8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
@@ -23,7 +23,8 @@ import org.apache.s2graph.core.mysqls.{ColumnMeta, Label}
import org.apache.s2graph.core.storage.StorageDeserializable._
import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable}
import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, VertexId}
-import org.apache.s2graph.core.{S2Graph, QueryParam, S2Vertex}
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{QueryParam, S2Graph, S2Vertex}
import scala.collection.mutable.ListBuffer
@@ -33,7 +34,6 @@ class VertexDeserializable(graph: S2Graph,
cacheElementOpt: Option[S2Vertex]): Option[S2Vertex] = {
try {
val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
val kv = kvs.head
val version = HBaseType.DEFAULT_VERSION
val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version)
@@ -64,6 +64,7 @@ class VertexDeserializable(graph: S2Graph,
assert(maxTs != Long.MinValue)
val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, belongLabelIds = belongLabelIds)
S2Vertex.fillPropsWithTs(vertex, propsMap.toMap)
+
Option(vertex)
} catch {
case e: Exception => None
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
index 1dbcd00..ee147f1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
@@ -22,6 +22,8 @@ package org.apache.s2graph.core.storage.serde.vertex
import org.apache.s2graph.core.S2Vertex
import org.apache.s2graph.core.storage.StorageSerializable._
import org.apache.s2graph.core.storage.{SKeyValue, Serializable}
+import org.apache.s2graph.core.utils.logger
+
import scala.collection.JavaConverters._
case class VertexSerializable(vertex: S2Vertex, intToBytes: Int => Array[Byte] = intToBytes) extends Serializable[S2Vertex] {
@@ -45,6 +47,6 @@ case class VertexSerializable(vertex: S2Vertex, intToBytes: Int => Array[Byte] =
val belongsTo = vertex.belongLabelIds.map { labelId => intToBytes(S2Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
(base ++ belongsTo).map { case (qualifier, value) =>
SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts)
- } toSeq
+ }.toSeq
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
index c54dcde..a98104c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
@@ -20,10 +20,12 @@
package org.apache.s2graph.core.utils
import java.util.concurrent.atomic.AtomicBoolean
+
import com.google.common.cache.CacheBuilder
+
+import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
-import scala.collection.JavaConversions._
object SafeUpdateCache {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
index 18bf998..865717d 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
@@ -34,6 +34,7 @@ class S2GraphProvider extends AbstractGraphProvider {
val config = ConfigFactory.load()
val m = new java.util.HashMap[String, AnyRef]()
m.put(Graph.GRAPH, classOf[S2Graph].getName)
+// m.put("db.default.url", "jdbc:h2:mem:db1;MODE=MYSQL")
m
}
@@ -64,8 +65,6 @@ class S2GraphProvider extends AbstractGraphProvider {
}
private def cleanupSchema(graph: Graph): Unit = {
-// new File("./var/metastore").delete()
-
val s2Graph = graph.asInstanceOf[S2Graph]
val mnt = s2Graph.getManagement()
val defaultService = s2Graph.DefaultService
@@ -139,12 +138,15 @@ class S2GraphProvider extends AbstractGraphProvider {
ColumnMeta.findOrInsert(defaultServiceColumn.id.get, "aKey", dataType, useCache = false)
}
- // knows props
-// mnt.createLabel("knows", defaultService.serviceName, "vertex", "string", defaultService.serviceName, "vertex", "string",
-// true, defaultService.serviceName, Nil, knowsProp, "strong", None, None, options = Option("""{"skipReverse": false}"""))
+ if (testClass.getSimpleName == "DetachedEdgeTest") {
+ mnt.createLabel("knows", defaultService.serviceName, "person", "integer", defaultService.serviceName, "person", "integer",
+ true, defaultService.serviceName, Nil, knowsProp, "strong", None, None, options = Option("""{"skipReverse": false}"""))
+ } else {
+ mnt.createLabel("knows", defaultService.serviceName, "vertex", "string", defaultService.serviceName, "vertex", "string",
+ true, defaultService.serviceName, Nil, knowsProp, "strong", None, None, options = Option("""{"skipReverse": false}"""))
+ }
+
- mnt.createLabel("knows", defaultService.serviceName, "vertex", "string", defaultService.serviceName, "vertex", "string",
- true, defaultService.serviceName, Nil, knowsProp, "strong", None, None, options = Option("""{"skipReverse": false}"""))
// if (testClass.getSimpleName.contains("VertexTest") || (testClass.getSimpleName == "EdgeTest" && testName == "shouldAutotypeDoubleProperties")) {
// mnt.createLabel("knows", defaultService.serviceName, "vertex", "string", defaultService.serviceName, "vertex", "string",
// true, defaultService.serviceName, Nil, knowsProp, "strong", None, None, options = Option("""{"skipReverse": false}"""))
@@ -153,7 +155,8 @@ class S2GraphProvider extends AbstractGraphProvider {
// true, defaultService.serviceName, Nil, knowsProp, "strong", None, None, options = Option("""{"skipReverse": false}"""))
// }
- val personColumn = Management.createServiceColumn(defaultService.serviceName, "person", "integer", Seq(Prop(T.id.toString, "-1", "integer"), Prop("name", "-", "string"), Prop("age", "0", "integer"), Prop("location", "-", "string")))
+ val personColumn = Management.createServiceColumn(defaultService.serviceName, "person", "integer",
+ Seq(Prop(T.id.toString, "-1", "integer"), Prop("name", "-", "string"), Prop("age", "0", "integer"), Prop("location", "-", "string")))
val softwareColumn = Management.createServiceColumn(defaultService.serviceName, "software", "integer", Seq(Prop(T.id.toString, "-1", "integer"), Prop("name", "-", "string"), Prop("lang", "-", "string")))
val productColumn = Management.createServiceColumn(defaultService.serviceName, "product", "integer", Nil)
val dogColumn = Management.createServiceColumn(defaultService.serviceName, "dog", "integer", Nil)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
index 46f58a8..5454e24 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
@@ -19,21 +19,18 @@
package org.apache.s2graph.core.tinkerpop.structure
-import java.io.File
-
import org.apache.s2graph.core.Management.JsonModel.Prop
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core._
import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.{Management, S2Graph, S2Vertex, TestCommonWithModels}
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
-import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, T, Vertex}
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource
+import org.apache.tinkerpop.gremlin.structure._
+import org.apache.tinkerpop.gremlin.structure.util.Attachable
+import org.apache.tinkerpop.gremlin.structure.util.detached.{DetachedEdge, DetachedFactory}
import org.scalatest.{FunSuite, Matchers}
class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels {
- import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
initTests()
@@ -187,7 +184,147 @@ class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels {
// test("addVertex with empty parameter") {
//
// }
- test("aaa") {
+// test("aaa") {
+// val mnt = graph.management
+// val defaultService = graph.DefaultService
+// val defaultServiceColumn = graph.DefaultColumn
+// val columnNames = Set(defaultServiceColumn.columnName, "person", "software", "product", "dog")
+// val labelNames = Set("knows", "created", "bought", "test", "self", "friends", "friend", "hate", "collaborator", "test1", "test2", "test3", "pets", "walks")
+//
+// Management.deleteService(defaultService.serviceName)
+// columnNames.foreach { columnName =>
+// Management.deleteColumn(defaultServiceColumn.service.serviceName, columnName)
+// }
+// labelNames.foreach { labelName =>
+// Management.deleteLabel(labelName)
+// }
+//
+// val knows = mnt.createLabel("knows",
+// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
+// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
+// true, defaultService.serviceName, Nil, Seq(Prop("since", "0", "integer")), consistencyLevel = "strong", None, None,
+// options = Option("""{"skipReverse": false}"""))
+//
+// val pets = mnt.createLabel("pets",
+// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
+// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
+// true, defaultService.serviceName, Nil, Nil, consistencyLevel = "strong", None, None,
+// options = Option("""{"skipReverse": false}"""))
+//
+// val walks = mnt.createLabel("walks",
+// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
+// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
+// true, defaultService.serviceName, Nil, Seq(Prop("location", "-", "string")), consistencyLevel = "strong", None, None,
+// options = Option("""{"skipReverse": false}"""))
+//
+// val livesWith = mnt.createLabel("livesWith",
+// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
+// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
+// true, defaultService.serviceName, Nil, Nil, consistencyLevel = "strong", None, None,
+// options = Option("""{"skipReverse": false}"""))
+//
+// val friend = mnt.createLabel("friend", defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
+// true, defaultService.serviceName, Nil,
+// Seq(
+// Prop("name", "-", "string"),
+// Prop("location", "-", "string"),
+// Prop("status", "-", "string")
+// ),
+// "strong", None, None,
+// options = Option("""{"skipReverse": false}""")
+// )
+//
+// val v1 = graph.addVertex("name", "marko")
+// val v2 = graph.addVertex("name", "puppy")
+//
+// v1.addEdge("knows", v2, "since", Int.box(2010))
+// v1.addEdge("pets", v2)
+// v1.addEdge("walks", v2, "location", "arroyo")
+// v2.addEdge("knows", v1, "since", Int.box(2010))
+//
+// v1.edges(Direction.BOTH).foreach { e => logger.error(s"[Edge]: $e")}
+// }
+
+// test("bb") {
+// val mnt = graph.management
+// val defaultService = graph.DefaultService
+// val defaultServiceColumn = graph.DefaultColumn
+// val columnNames = Set(defaultServiceColumn.columnName, "person", "software", "product", "dog")
+// val labelNames = Set("knows", "created", "bought", "test", "self", "friends", "friend", "hate", "collaborator", "test1", "test2", "test3", "pets", "walks")
+//
+// Management.deleteService(defaultService.serviceName)
+// columnNames.foreach { columnName =>
+// Management.deleteColumn(defaultServiceColumn.service.serviceName, columnName)
+// }
+// labelNames.foreach { labelName =>
+// Management.deleteLabel(labelName)
+// }
+// val personColumn = Management.createServiceColumn(defaultService.serviceName, "person", "integer",
+// Seq(Prop(T.id.toString, "-1", "integer"), Prop("name", "-", "string"), Prop("age", "0", "integer"), Prop("location", "-", "string")))
+// val knows = mnt.createLabel("knows",
+// defaultService.serviceName, "person", "integer",
+// defaultService.serviceName, "person", "integer",
+// true, defaultService.serviceName, Nil, Seq(Prop("since", "0", "integer"), Prop("year", "0", "integer")), consistencyLevel = "strong", None, None)
+//
+// val created = mnt.createLabel("created", defaultService.serviceName, "person", "integer", defaultService.serviceName, "software", "integer",
+// true, defaultService.serviceName, Nil, Seq(Prop("weight", "0.0", "double")), "strong", None, None)
+//
+//// val v1 = graph.toVertex(graph.DefaultService.serviceName, "person", 1)
+//// val v4 = graph.toVertex(graph.DefaultService.serviceName, "person", 4)
+//// val ts = System.currentTimeMillis()
+//// val edge = graph.newEdge(v1, v4, knows.get,
+//// GraphUtil.directions("out"), GraphUtil.operations("insert"),
+//// propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, knows.get.schemaVersion)))
+// val v1 = graph.addVertex(T.label, "person", T.id, Int.box(1), "name", "marko")
+// val v4 = graph.addVertex(T.label, "person", T.id, Int.box(4), "name", "vadas")
+//
+// val g = graph.traversal()
+// v1.addEdge("knows", v4, "year", Int.box(2002))
+//
+// def convertToEdgeId(outVertexName: String, edgeLabel: String, inVertexName: String): AnyRef = {
+// g.V().has("name", outVertexName).outE(edgeLabel).as("e").inV.has("name", inVertexName).select[Edge]("e").next().id()
+// }
+//
+// g.V().has("name", "marko").outE("knows").as("e").inV.foreach(e => logger.error(s"[Edge]: $e"))
+//
+//// .as("e").inV.has("name", "vadas").select[Edge]("e").next().id()
+//// g.E(convertToEdgeId("marko", "knows", "vadas")).foreach(e => logger.error(s"[EDGE]: $e"))
+//// val x = DetachedFactory.detach(g.E(convertToEdgeId("marko", "knows", "vadas")).next(), true)
+////// .hashCode()
+//// val y = DetachedFactory.detach(g.E(convertToEdgeId("marko", "knows", "vadas")).next(), true)
+//// .hashCode()
+//// logger.error(s"[X]: $x")
+//// logger.error(s"[Y]: $y")
+//
+//// g.E().foreach(e => logger.error(s"[Edge]: $e"))
+//// g.V().has("name", "marko").outE("knows").foreach(v => logger.error(s"[OutVertex]: $v"))
+//// g.V().has("name", "vadas").inE("knows").foreach(v => logger.error(s"[InVertex]: $v"))
+//
+//
+// @Test
+// @LoadGraphWith(GraphData.MODERN)
+// @FeatureRequirementSet(FeatureRequirementSet.Package.SIMPLE)
+// def shouldConstructDetachedEdgeAsReference() {
+//
+// graph.traversal().E(convertToEdgeId("marko", "knows", "vadas")).next().property("year", 2002);
+// val detachedEdge = DetachedFactory.detach(g.E(convertToEdgeId("marko", "knows", "vadas")).next(), false);
+//// assertEquals(convertToEdgeId("marko", "knows", "vadas"), detachedEdge.id());
+//// assertEquals("knows", detachedEdge.label());
+//// assertEquals(DetachedVertex.class, detachedEdge.vertices(Direction.OUT).next().getClass());
+//// assertEquals(convertToVertexId("marko"), detachedEdge.vertices(Direction.OUT).next().id());
+//// assertEquals("person", detachedEdge.vertices(Direction.IN).next().label());
+//// assertEquals(DetachedVertex.class, detachedEdge.vertices(Direction.IN).next().getClass());
+//// assertEquals(convertToVertexId("vadas"), detachedEdge.vertices(Direction.IN).next().id());
+//// assertEquals("person", detachedEdge.vertices(Direction.IN).next().label());
+////
+//// assertEquals(0, IteratorUtils.count(detachedEdge.properties()));
+// }
+//// shouldConstructDetachedEdgeAsReference()
+// }
+ def convertToEdgeId(g: GraphTraversalSource, outVertexName: String, edgeLabel: String, inVertexName: String): AnyRef = {
+ g.V().has("name", outVertexName).outE(edgeLabel).as("e").inV.has("name", inVertexName).select[Edge]("e").next().id()
+ }
+ test("ccc") {
val mnt = graph.management
val defaultService = graph.DefaultService
val defaultServiceColumn = graph.DefaultColumn
@@ -201,90 +338,29 @@ class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels {
labelNames.foreach { labelName =>
Management.deleteLabel(labelName)
}
-
+ val personColumn = Management.createServiceColumn(defaultService.serviceName, "person", "integer",
+ Seq(Prop(T.id.toString, "-1", "integer"), Prop("name", "-", "string"), Prop("age", "0", "integer"), Prop("location", "-", "string")))
val knows = mnt.createLabel("knows",
- defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
- defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
- true, defaultService.serviceName, Nil, Seq(Prop("since", "0", "integer")), consistencyLevel = "strong", None, None,
- options = Option("""{"skipReverse": false}"""))
-
- val pets = mnt.createLabel("pets",
- defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
- defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
- true, defaultService.serviceName, Nil, Nil, consistencyLevel = "strong", None, None,
- options = Option("""{"skipReverse": false}"""))
-
- val walks = mnt.createLabel("walks",
- defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
- defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
- true, defaultService.serviceName, Nil, Seq(Prop("location", "-", "string")), consistencyLevel = "strong", None, None,
- options = Option("""{"skipReverse": false}"""))
-
- val livesWith = mnt.createLabel("livesWith",
- defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
- defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
- true, defaultService.serviceName, Nil, Nil, consistencyLevel = "strong", None, None,
- options = Option("""{"skipReverse": false}"""))
-
- val friend = mnt.createLabel("friend", defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType,
- true, defaultService.serviceName, Nil,
- Seq(
- Prop("name", "-", "string"),
- Prop("location", "-", "string"),
- Prop("status", "-", "string")
- ),
- "strong", None, None,
- options = Option("""{"skipReverse": false}""")
- )
-// (0 until 2).foreach(i => graph.addVertex("myId", Int.box(i)))
-//
-// graph.vertices().foreach(v =>
-// graph.vertices().foreach(u => v.addEdge("knows", u, "myEdgeId", Int.box(12)))
-// )
-//
-// val v = graph.vertices().toSeq.head
-// v.remove()
-//
-// graph.edges().foreach(e =>
-// logger.error(s"[Edge]: $e")
-// )
+ defaultService.serviceName, "person", "integer",
+ defaultService.serviceName, "person", "integer",
+ true, defaultService.serviceName, Nil, Seq(Prop("since", "0", "integer"), Prop("year", "0", "integer")), consistencyLevel = "strong", None, None)
+ val created = mnt.createLabel("created",
+ defaultService.serviceName, "person", "integer",
+ defaultService.serviceName, "person", "integer",
+ true, defaultService.serviceName, Nil, Seq(Prop("weight", "0.0", "double")), "strong", None, None)
-// val v1 = graph.addVertex(T.id, "v1", "name", "marko")
-// val v2 = graph.addVertex(T.id, "101", "name", "puppy")
-// v1.addEdge("knows", v2, "since", Int.box(2010))
-// v1.addEdge("pets", v2)
-// v1.addEdge("walks", v2, "location", "arroyo")
-// v2.addEdge("knows", v1, "since", Int.box(2010))
-//
-// v1.edges(Direction.BOTH).foreach(edge => {
-// v1.addEdge("livesWith", v2)
-// v1.addEdge("walks", v2, "location", "river")
-// edge.remove()
-// })
-//
-// val edges = v1.edges(Direction.BOTH)
-// edges.foreach { e =>
-// logger.error(s"[Before]: $e")
-// e.remove()
-// }
-//
-// v1.edges(Direction.OUT).foreach { e =>
-// logger.error(s"[V1.Edge]: $e")
-// }
-// v2.edges(Direction.BOTH).foreach { e =>
-// logger.error(s"[V2.Edge]: $e")
-// }
- (0 until 25).foreach { i =>
- val v = graph.addVertex()
- v.addEdge("friend", v)
- }
- graph.vertices().foreach(v => logger.error(s"[Vertex]: $v"))
- graph.edges().foreach(e => logger.error(s"[Edge]: $e"))
+ val g = graph.traversal()
+ val v1 = graph.addVertex(T.label, "person", T.id, Int.box(1), "name", "josh")
+ val v4 = graph.addVertex(T.label, "person", T.id, Int.box(4), "name", "lop")
+ val e = v1.addEdge("created", v4)
- graph.edges().foreach(e => e.remove)
+ val toDetach = g.E(convertToEdgeId(g, "josh", "created", "lop")).next()
+ val outV = toDetach.vertices(Direction.OUT).next()
+ val detachedEdge = DetachedFactory.detach(toDetach, true)
+ val attached = detachedEdge.attach(Attachable.Method.get(outV))
- graph.edges().foreach(e => logger.error(s"[Edge]: $e"))
+ assert(toDetach.equals(attached))
+ assert(!attached.isInstanceOf[DetachedEdge])
}
-
}
\ No newline at end of file