You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by da...@apache.org on 2018/06/04 09:21:51 UTC
[2/5] incubator-s2graph git commit: implement jdbc driver for H2 DB
implement jdbc driver for H2 DB
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2b1c88a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2b1c88a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2b1c88a8
Branch: refs/heads/master
Commit: 2b1c88a81d1b9524d6746a705f32db4c4c407d55
Parents: 3ddea3f
Author: daewon <da...@apache.org>
Authored: Mon May 28 12:59:42 2018 +0900
Committer: daewon <da...@apache.org>
Committed: Mon May 28 13:01:20 2018 +0900
----------------------------------------------------------------------
.../apache/s2graph/core/ResourceManager.scala | 13 +--
.../core/storage/jdbc/JdbcEdgeFetcher.scala | 19 +++++
.../core/storage/jdbc/JdbcEdgeMutator.scala | 90 ++++++++++++++------
.../s2graph/core/utils/SafeUpdateCache.scala | 10 ++-
.../core/storage/jdbc/JdbcStorageTest.scala | 22 ++++-
5 files changed, 119 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
index 423b3c9..051ca9f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
@@ -74,14 +74,17 @@ class ResourceManager(graph: S2GraphLike,
def onEvict(oldValue: AnyRef): Unit = {
oldValue match {
- case o: Option[_] => o.foreach {
- case v: AutoCloseable => v.close()
+ case o: Option[_] => o.foreach { case v: AutoCloseable =>
+ v.close()
+ logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.")
}
- case v: AutoCloseable => v.close()
+
+ case v: AutoCloseable =>
+ v.close()
+ logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.")
+
case _ => logger.info(s"Class does't have close() method ${oldValue.getClass.getName}")
}
-
- logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.")
}
def getOrElseUpdateVertexFetcher(column: ServiceColumn,
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala
index 927635e..ea3df74 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.s2graph.core.storage.jdbc
import com.typesafe.config.Config
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala
index 96c214b..1650a2d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala
@@ -23,6 +23,7 @@ import com.typesafe.config.Config
import org.apache.s2graph.core._
import org.apache.s2graph.core.schema.Label
import org.apache.s2graph.core.storage.MutateResponse
+import org.apache.s2graph.core.utils.logger
import org.joda.time.DateTime
import scalikejdbc._
@@ -40,44 +41,79 @@ class JdbcEdgeMutator(graph: S2GraphLike) extends EdgeMutator {
createTable(label)
}
- override def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
- _edges.groupBy(_.innerLabel).flatMap { case (label, edges) =>
- val affectedColumns = JdbcStorage.affectedColumns(label)
-
- val insertValues = edges.map { edge =>
- val values = affectedColumns.collect {
- case "_timestamp" => new DateTime(edge.ts)
- case "_from" => edge.srcForVertex.innerId.value
- case "_to" => edge.tgtForVertex.innerId.value
- case k: String => edge.propertyValue(k).map(iv => iv.innerVal.value).orNull
- }
-
- values
- }
+ def extractValues(affectedColumns: Seq[String], edge: S2EdgeLike): Seq[Any] = {
+ val values = affectedColumns.collect {
+ case "_timestamp" => new DateTime(edge.ts)
+ case "_from" => edge.srcForVertex.innerId.value
+ case "_to" => edge.tgtForVertex.innerId.value
+ case k: String => edge.propertyValue(k).map(iv => iv.innerVal.value).orNull
+ }
+
+ values
+ }
+
+ def keyColumns(label: Label): Seq[String] =
+ if (label.consistencyLevel == "strong") Seq("_from", "_to")
+ else Seq("_from", "_to", "_timestamp")
+
+ def insertOp(label: Label, edges: Seq[S2EdgeLike]): Seq[Boolean] = {
+ val table = JdbcStorage.labelTableName(label)
+ val affectedColumns = JdbcStorage.affectedColumns(label)
+ val insertValues = edges.map { edge => extractValues(affectedColumns, edge) }
+
+ val columns = affectedColumns.mkString(", ")
+ val prepared = affectedColumns.map(_ => "?").mkString(", ")
+
+ val conflictCheckKeys = keyColumns(label).mkString("(", ",", ")")
+
+ val sqlRaw = s"""MERGE INTO ${table} (${columns}) KEY ${conflictCheckKeys} VALUES (${prepared});"""
+ logger.debug(sqlRaw)
+
+ val sql = SQLSyntax.createUnsafely(sqlRaw)
+ val ret = withTxSession { session =>
+ sql"""${sql}""".batch(insertValues: _*).apply()(session)
+ }
+
+ ret.map(_ => true)
+ }
- val columns = affectedColumns.mkString(", ")
- val table = JdbcStorage.labelTableName(label)
- val prepared = affectedColumns.map(_ => "?").mkString(", ")
+ def deleteOp(label: Label, edges: Seq[S2EdgeLike]): Seq[Boolean] = {
+ val table = JdbcStorage.labelTableName(label)
- val conflictCheckKeys =
- if (label.consistencyLevel == "strong") "(_from, _to)"
- else "(_from, _to, _timestamp)"
+ val keyColumnLs = keyColumns(label)
+ val deleteValues = edges.map { edge => extractValues(keyColumnLs, edge) }
+ val prepared = keyColumnLs.map(k => s"${k} = ?").mkString(" AND ")
- val sqlRaw =
- s"""MERGE INTO ${table} (${columns}) KEY ${conflictCheckKeys} VALUES (${prepared})""".stripMargin
+ val sqlRaw = s"""DELETE FROM ${table} WHERE ($prepared);"""
+ logger.debug(sqlRaw)
+ val ret = withTxSession { session =>
val sql = SQLSyntax.createUnsafely(sqlRaw)
- withTxSession { session =>
- sql"""${sql}""".batch(insertValues: _*).apply()(session)
+ deleteValues.map { deleteValue =>
+ sql"""${sql}""".batch(Seq(deleteValue): _*).apply()(session)
}
+ }
+
+ ret.map(_ => true)
+ }
- insertValues
+ override def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+ val ret = _edges.groupBy(_.innerLabel).flatMap { case (label, edges) =>
+ val (edgesToDelete, edgesToInsert) = edges.partition(_.getOperation() == "delete")
+
+ insertOp(label, edgesToInsert) ++ deleteOp(label, edgesToDelete)
}
- Future.successful(_edges.map(_ => true))
+ Future.successful(ret.toSeq)
}
- override def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = ???
+ override def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
+ val ret = mutateStrongEdges(zkQuorum, _edges, withWait).map { ret =>
+ ret.zipWithIndex.map { case (r, i) => i -> r }
+ }
+
+ ret
+ }
override def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = ???
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
index 323ee9d..755b8d0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
@@ -124,7 +124,10 @@ class SafeUpdateCache(val config: Config)
}
def defaultOnEvict(oldValue: AnyRef): Unit = {
- logger.info(s"[SafeUpdateCache]: ${oldValue.getClass.getName} $oldValue is evicted.")
+ oldValue match {
+ case None =>
+ case _ => logger.info(s"[SafeUpdateCache]: ${oldValue.getClass.getName} $oldValue is evicted.")
+ }
}
def withCache[T <: AnyRef](key: String,
@@ -162,7 +165,10 @@ class SafeUpdateCache(val config: Config)
onEvict(cachedVal)
- logger.info(s"withCache update success: $cacheKey")
+ cachedVal match {
+ case None =>
+ case _ => logger.info(s"withCache update success: $cacheKey")
+ }
}
cachedVal
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala
index f06b2e6..424e5ba 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala
@@ -245,9 +245,19 @@ class JdbcStorageTest extends BaseFetcherTest {
)
fetchedEdges shouldBe Seq(edgeElricUpdated, edgeShon, edgeRain) // order by timestamp desc
+
+ // delete
+ val deleteEdge = Seq(edgeElric.copyOp(GraphUtil.operations("delete")))
+ Await.ready(mutator.mutateStrongEdges("", deleteEdge, true), Duration("10 sec"))
+
+ val fetchedEdgesAfterDeleted = Await.result(
+ fetcher.fetches(Seq(qr), Map.empty), Duration("10 sec")).flatMap(_.edgeWithScores.map(_.edge)
+ )
+
+ fetchedEdgesAfterDeleted shouldBe Seq(edgeShon, edgeRain) // elric was deleted
}
- test("Mutate and fetch edges - label C") {
+ test("Mutate and fetch edges - label C(weak)") {
val label = Label.findByName("C", useCache = false).get
JdbcStorage.dropTable(label)
@@ -277,5 +287,15 @@ class JdbcStorageTest extends BaseFetcherTest {
)
fetchedEdges shouldBe Seq(edgeShon, edgeShon2)
+
+ // delete
+ val deleteEdge = Seq(edgeShon.copyOp(GraphUtil.operations("delete")))
+ Await.ready(mutator.mutateStrongEdges("", deleteEdge, true), Duration("10 sec"))
+
+ val fetchedEdgesAfterDeleted = Await.result(
+ fetcher.fetches(Seq(qr), Map.empty), Duration("10 sec")).flatMap(_.edgeWithScores.map(_.edge)
+ )
+
+ fetchedEdgesAfterDeleted shouldBe Seq(edgeShon2) // ts is diff
}
}