You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by da...@apache.org on 2018/06/04 09:21:51 UTC

[2/5] incubator-s2graph git commit: implement jdbc driver for H2 DB

implement jdbc driver for H2 DB


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2b1c88a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2b1c88a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2b1c88a8

Branch: refs/heads/master
Commit: 2b1c88a81d1b9524d6746a705f32db4c4c407d55
Parents: 3ddea3f
Author: daewon <da...@apache.org>
Authored: Mon May 28 12:59:42 2018 +0900
Committer: daewon <da...@apache.org>
Committed: Mon May 28 13:01:20 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/core/ResourceManager.scala   | 13 +--
 .../core/storage/jdbc/JdbcEdgeFetcher.scala     | 19 +++++
 .../core/storage/jdbc/JdbcEdgeMutator.scala     | 90 ++++++++++++++------
 .../s2graph/core/utils/SafeUpdateCache.scala    | 10 ++-
 .../core/storage/jdbc/JdbcStorageTest.scala     | 22 ++++-
 5 files changed, 119 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
index 423b3c9..051ca9f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
@@ -74,14 +74,17 @@ class ResourceManager(graph: S2GraphLike,
 
   def onEvict(oldValue: AnyRef): Unit = {
     oldValue match {
-      case o: Option[_] => o.foreach {
-        case v: AutoCloseable => v.close()
+      case o: Option[_] => o.foreach { case v: AutoCloseable =>
+        v.close()
+        logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.")
       }
-      case v: AutoCloseable => v.close()
+
+      case v: AutoCloseable =>
+        v.close()
+        logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.")
+
       case _ => logger.info(s"Class does't have close() method ${oldValue.getClass.getName}")
     }
-
-    logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.")
   }
 
   def getOrElseUpdateVertexFetcher(column: ServiceColumn,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala
index 927635e..ea3df74 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.s2graph.core.storage.jdbc
 
 import com.typesafe.config.Config

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala
index 96c214b..1650a2d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala
@@ -23,6 +23,7 @@ import com.typesafe.config.Config
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.schema.Label
 import org.apache.s2graph.core.storage.MutateResponse
+import org.apache.s2graph.core.utils.logger
 import org.joda.time.DateTime
 import scalikejdbc._
 
@@ -40,44 +41,79 @@ class JdbcEdgeMutator(graph: S2GraphLike) extends EdgeMutator {
     createTable(label)
   }
 
-  override def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
-    _edges.groupBy(_.innerLabel).flatMap { case (label, edges) =>
-      val affectedColumns = JdbcStorage.affectedColumns(label)
-
-      val insertValues = edges.map { edge =>
-        val values = affectedColumns.collect {
-          case "_timestamp" => new DateTime(edge.ts)
-          case "_from" => edge.srcForVertex.innerId.value
-          case "_to" => edge.tgtForVertex.innerId.value
-          case k: String => edge.propertyValue(k).map(iv => iv.innerVal.value).orNull
-        }
-
-        values
-      }
+  def extractValues(affectedColumns: Seq[String], edge: S2EdgeLike): Seq[Any] = {
+    val values = affectedColumns.collect {
+      case "_timestamp" => new DateTime(edge.ts)
+      case "_from" => edge.srcForVertex.innerId.value
+      case "_to" => edge.tgtForVertex.innerId.value
+      case k: String => edge.propertyValue(k).map(iv => iv.innerVal.value).orNull
+    }
+
+    values
+  }
+
+  def keyColumns(label: Label): Seq[String] =
+    if (label.consistencyLevel == "strong") Seq("_from", "_to")
+    else Seq("_from", "_to", "_timestamp")
+
+  def insertOp(label: Label, edges: Seq[S2EdgeLike]): Seq[Boolean] = {
+    val table = JdbcStorage.labelTableName(label)
+    val affectedColumns = JdbcStorage.affectedColumns(label)
+    val insertValues = edges.map { edge => extractValues(affectedColumns, edge) }
+
+    val columns = affectedColumns.mkString(", ")
+    val prepared = affectedColumns.map(_ => "?").mkString(", ")
+
+    val conflictCheckKeys = keyColumns(label).mkString("(", ",", ")")
+
+    val sqlRaw = s"""MERGE INTO ${table} (${columns}) KEY ${conflictCheckKeys} VALUES (${prepared});"""
+    logger.debug(sqlRaw)
+
+    val sql = SQLSyntax.createUnsafely(sqlRaw)
+    val ret = withTxSession { session =>
+      sql"""${sql}""".batch(insertValues: _*).apply()(session)
+    }
+
+    ret.map(_ => true)
+  }
 
-      val columns = affectedColumns.mkString(", ")
-      val table = JdbcStorage.labelTableName(label)
-      val prepared = affectedColumns.map(_ => "?").mkString(", ")
+  def deleteOp(label: Label, edges: Seq[S2EdgeLike]): Seq[Boolean] = {
+    val table = JdbcStorage.labelTableName(label)
 
-      val conflictCheckKeys =
-        if (label.consistencyLevel == "strong") "(_from, _to)"
-        else "(_from, _to, _timestamp)"
+    val keyColumnLs = keyColumns(label)
+    val deleteValues = edges.map { edge => extractValues(keyColumnLs, edge) }
+    val prepared = keyColumnLs.map(k => s"${k} = ?").mkString(" AND ")
 
-      val sqlRaw =
-        s"""MERGE INTO ${table} (${columns}) KEY ${conflictCheckKeys} VALUES (${prepared})""".stripMargin
+    val sqlRaw = s"""DELETE FROM ${table} WHERE ($prepared);"""
+    logger.debug(sqlRaw)
 
+    val ret = withTxSession { session =>
       val sql = SQLSyntax.createUnsafely(sqlRaw)
-      withTxSession { session =>
-        sql"""${sql}""".batch(insertValues: _*).apply()(session)
+      deleteValues.map { deleteValue =>
+        sql"""${sql}""".batch(Seq(deleteValue): _*).apply()(session)
       }
+    }
+
+    ret.map(_ => true)
+  }
 
-      insertValues
+  override def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+    val ret = _edges.groupBy(_.innerLabel).flatMap { case (label, edges) =>
+      val (edgesToDelete, edgesToInsert) = edges.partition(_.getOperation() == "delete")
+
+      insertOp(label, edgesToInsert) ++ deleteOp(label, edgesToDelete)
     }
 
-    Future.successful(_edges.map(_ => true))
+    Future.successful(ret.toSeq)
   }
 
-  override def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = ???
+  override def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
+    val ret = mutateStrongEdges(zkQuorum, _edges, withWait).map { ret =>
+      ret.zipWithIndex.map { case (r, i) => i -> r }
+    }
+
+    ret
+  }
 
   override def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = ???
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
index 323ee9d..755b8d0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
@@ -124,7 +124,10 @@ class SafeUpdateCache(val config: Config)
   }
 
   def defaultOnEvict(oldValue: AnyRef): Unit = {
-    logger.info(s"[SafeUpdateCache]: ${oldValue.getClass.getName} $oldValue is evicted.")
+    oldValue match {
+      case None =>
+      case _ => logger.info(s"[SafeUpdateCache]: ${oldValue.getClass.getName} $oldValue is evicted.")
+    }
   }
 
   def withCache[T <: AnyRef](key: String,
@@ -162,7 +165,10 @@ class SafeUpdateCache(val config: Config)
 
               onEvict(cachedVal)
 
-              logger.info(s"withCache update success: $cacheKey")
+              cachedVal match {
+                case None =>
+                case _ => logger.info(s"withCache update success: $cacheKey")
+              }
           }
 
           cachedVal

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala
index f06b2e6..424e5ba 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala
@@ -245,9 +245,19 @@ class JdbcStorageTest extends BaseFetcherTest {
     )
 
     fetchedEdges shouldBe Seq(edgeElricUpdated, edgeShon, edgeRain) // order by timestamp desc
+
+    // delete
+    val deleteEdge = Seq(edgeElric.copyOp(GraphUtil.operations("delete")))
+    Await.ready(mutator.mutateStrongEdges("", deleteEdge, true), Duration("10 sec"))
+
+    val fetchedEdgesAfterDeleted = Await.result(
+      fetcher.fetches(Seq(qr), Map.empty), Duration("10 sec")).flatMap(_.edgeWithScores.map(_.edge)
+    )
+
+    fetchedEdgesAfterDeleted shouldBe Seq(edgeShon, edgeRain) // elric was deleted
   }
 
-  test("Mutate and fetch edges - label C") {
+  test("Mutate and fetch edges - label C(weak)") {
     val label = Label.findByName("C", useCache = false).get
 
     JdbcStorage.dropTable(label)
@@ -277,5 +287,15 @@ class JdbcStorageTest extends BaseFetcherTest {
     )
 
     fetchedEdges shouldBe Seq(edgeShon, edgeShon2)
+
+    // delete
+    val deleteEdge = Seq(edgeShon.copyOp(GraphUtil.operations("delete")))
+    Await.ready(mutator.mutateStrongEdges("", deleteEdge, true), Duration("10 sec"))
+
+    val fetchedEdgesAfterDeleted = Await.result(
+      fetcher.fetches(Seq(qr), Map.empty), Duration("10 sec")).flatMap(_.edgeWithScores.map(_.edge)
+    )
+
+    fetchedEdgesAfterDeleted shouldBe Seq(edgeShon2) // ts is diff
   }
 }