You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/10/01 07:32:15 UTC

[1/2] incubator-s2graph git commit: [S2GRAPH-116] using ASM and ByteBuddy to add a proxy to Asynchbase's Scanner

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master c50d43a06 -> 38873aa08


[S2GRAPH-116] using ASM and ByteBuddy to add a proxy to Asynchbase's Scanner


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/555e59f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/555e59f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/555e59f9

Branch: refs/heads/master
Commit: 555e59f9882a24b906dcce5ddf8223629c0f0cc9
Parents: c50d43a
Author: Jong Wook Kim <jo...@nyu.edu>
Authored: Tue Sep 27 02:28:30 2016 -0400
Committer: Jong Wook Kim <jo...@nyu.edu>
Committed: Sat Oct 1 02:03:54 2016 -0400

----------------------------------------------------------------------
 s2core/build.sbt                                |   3 +-
 .../core/storage/hbase/AsynchbasePatcher.scala  | 131 +++++++++++++++++++
 .../core/storage/hbase/AsynchbaseStorage.scala  |  19 +--
 3 files changed, 144 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/555e59f9/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index bcaac44..a456ce0 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -40,7 +40,8 @@ libraryDependencies ++= Seq(
   "com.github.danielwegener" % "logback-kafka-appender" % "0.0.4",
   "com.stumbleupon" % "async" % "1.4.1",
   "io.netty" % "netty" % "3.9.4.Final" force(),
-  "org.hbase" % "asynchbase" % "1.7.2-S2GRAPH" from "https://github.com/SteamShon/asynchbase/raw/mvn-repo/org/hbase/asynchbase/1.7.2-S2GRAPH/asynchbase-1.7.2-S2GRAPH.jar"
+  "org.hbase" % "asynchbase" % "1.7.2",
+  "net.bytebuddy" % "byte-buddy" % "1.4.26"
 )
 
 libraryDependencies := {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/555e59f9/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbasePatcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbasePatcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbasePatcher.scala
new file mode 100644
index 0000000..3eaebfe
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbasePatcher.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import java.lang.Integer.valueOf
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.Callable
+
+import net.bytebuddy.ByteBuddy
+import net.bytebuddy.description.modifier.Visibility.PUBLIC
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy
+import net.bytebuddy.implementation.FieldAccessor
+import net.bytebuddy.implementation.MethodDelegation.to
+import net.bytebuddy.implementation.bind.annotation.{SuperCall, This}
+import net.bytebuddy.matcher.ElementMatchers._
+import org.apache.commons.io.IOUtils
+import org.hbase.async._
+import org.objectweb.asm.Opcodes.{ACC_FINAL, ACC_PRIVATE, ACC_PROTECTED, ACC_PUBLIC}
+import org.objectweb.asm._
+
+import scala.collection.JavaConversions._
+
+/**
+  * Upon initialization, it loads a patched version of Asynchbase's Scanner class,
+  * modified using ASM to make the classes non-final and their methods are all public,
+  * so that ByteBuddy can create subclasses of them.
+  *
+  * This object has to be initialized before any access to (i.e. any classloading of) Asynchbase,
+  * since the ClassLoader does not allow redefining already loaded classes unless we use instrumentation.
+  */
+object AsynchbasePatcher {
+
+  /** invoking this method will force the classloading of this object, thus triggering the patch mechanism below */
+  def init(): Unit = {
+    val methods = scannerClass.getMethods.map(_.getName)
+    assert(methods.contains("getRpcTimeout"))
+    assert(methods.contains("setRpcTimeout"))
+  }
+
+  /** instantiate a new Scanner, patched to support RPC timeout */
+  def newScanner(client: HBaseClient, table: Array[Byte]): ScannerExtra = {
+    val constructor = scannerClass.getConstructor(classOf[HBaseClient], BA)
+    constructor.setAccessible(true)
+    constructor.newInstance(client, table).asInstanceOf[ScannerExtra]
+  }
+
+  /** instantiate a new Scanner, patched to support RPC timeout */
+  def newScanner(client: HBaseClient, table: String): ScannerExtra = {
+    newScanner(client, table.getBytes(StandardCharsets.UTF_8))
+  }
+
+
+  trait RpcTimeout {
+    def getRpcTimeout: Int
+    def setRpcTimeout(timeout: Int): Unit
+  }
+
+  type ScannerExtra = Scanner with RpcTimeout
+
+  val interceptor = new Object() {
+    def getNextRowsRequest(@This scanner: ScannerExtra, @SuperCall getNextRowsRequest: Callable[HBaseRpc]): HBaseRpc = {
+      val request = getNextRowsRequest.call()
+      val rpcTimeout = scanner.getRpcTimeout
+      if (rpcTimeout > 0) {
+        request.setTimeout(rpcTimeout)
+      }
+      request
+    }
+  }
+
+  private val BA = classOf[Array[Byte]]
+  private val classLoader = getClass.getClassLoader
+  private val defineClass = classOf[ClassLoader].getDeclaredMethod("defineClass", classOf[String], BA, classOf[Int], classOf[Int])
+
+  /** a java.lang.Class instance for the patched Scanner class */
+  private val scannerClass = {
+    new ByteBuddy()
+      .subclass(loadClass("Scanner"))
+      .name("org.hbase.async.ScannerEx")
+      .implement(classOf[RpcTimeout]).intercept(FieldAccessor.ofBeanProperty())
+      .defineField("rpcTimeout", classOf[Int], PUBLIC)
+      .method(named("getNextRowsRequest")).intercept(to(interceptor))
+      .make.load(classLoader, ClassLoadingStrategy.Default.INJECTION).getLoaded
+  }
+
+  /** loads Asynchbase classes from s2core's classpath
+    * *MUST* be called before any access to those classes,
+    * otherwise the classloading will fail with an "attempted duplicate class definition" error.
+    **/
+  private def loadClass(name: String): Class[_] = {
+    classLoader.getResources(s"org/hbase/async/$name.class").toSeq.headOption match {
+      case Some(url) =>
+        val stream = url.openStream()
+        val bytes = try { IOUtils.toByteArray(stream) } finally { stream.close() }
+
+        // patch the bytecode so that the class is no longer final and the methods are all accessible
+        val cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES)
+        new ClassReader(bytes).accept(new ClassAdapter(cw) {
+          override def visit(version: Int, access: Int, name: String, signature: String, superName: String, interfaces: Array[String]): Unit = {
+            super.visit(version, access & ~ACC_FINAL, name, signature, superName, interfaces)
+          }
+          override def visitMethod(access: Int, name: String, desc: String, signature: String, exceptions: Array[String]): MethodVisitor = {
+            super.visitMethod(access & ~ACC_PRIVATE & ~ACC_PROTECTED & ~ACC_FINAL | ACC_PUBLIC, name, desc, signature, exceptions)
+          }
+        }, 0)
+        val patched = cw.toByteArray
+
+        defineClass.setAccessible(true)
+        defineClass.invoke(classLoader, s"org.hbase.async.$name", patched, valueOf(0), valueOf(patched.length)).asInstanceOf[Class[_]]
+      case None =>
+        throw new ClassNotFoundException(s"Could not find Asynchbase class: $name")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/555e59f9/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 2800f46..783fd8a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -37,6 +37,7 @@ import org.apache.s2graph.core.mysqls.LabelMeta
 import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.types.{HBaseType, VertexId}
 import org.apache.s2graph.core.utils._
+import org.hbase.async.FilterList.Operator.MUST_PASS_ALL
 import org.hbase.async._
 
 import scala.collection.JavaConversions._
@@ -51,6 +52,7 @@ object AsynchbaseStorage {
   val edgeCf = Serializable.edgeCf
   val emptyKVs = new util.ArrayList[KeyValue]()
 
+  AsynchbasePatcher.init()
 
   def makeClient(config: Config, overrideKv: (String, String)*) = {
     val asyncConfig: org.hbase.async.Config =
@@ -198,13 +200,13 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
 
     label.schemaVersion match {
       case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
-        val scanner = client.newScanner(label.hbaseTableName.getBytes)
+        val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName)
         scanner.setFamily(edgeCf)
 
         /*
          * TODO: remove this part.
          */
-        val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption
+        val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq)
         val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam"))
 
         val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
@@ -246,19 +248,20 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
         // SET option for this rpc properly.
         scanner
       case _ =>
-        val get =
-          if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, qualifier)
-          else new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)
+        val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
+          new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, qualifier)
+        } else {
+          new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)
+        }
 
         get.maxVersions(1)
         get.setFailfast(true)
-        get.setMaxResultsPerColumnFamily(queryParam.limit)
-        get.setRowOffsetPerColumnFamily(queryParam.offset)
         get.setMinTimestamp(minTs)
         get.setMaxTimestamp(maxTs)
         get.setTimeout(queryParam.rpcTimeoutInMillis)
 
-        if (queryParam.columnRangeFilter != null) get.setFilter(queryParam.columnRangeFilter)
+        val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset)
+        get.setFilter(new FilterList(pagination +: Option(queryParam.columnRangeFilter).toSeq, MUST_PASS_ALL))
 
         get
     }


[2/2] incubator-s2graph git commit: [S2GRAPH-116] Use the official Asynchbase release and stop managing our custom fork

Posted by st...@apache.org.
[S2GRAPH-116] Use the official Asynchbase release and stop managing our custom fork

JIRA:
    [S2GRAPH-116] https://issues.apache.org/jira/browse/S2GRAPH-116

Pull Request:
    Closes #85

Authors
    Jong Wook Kim: jongwook@nyu.edu


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/38873aa0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/38873aa0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/38873aa0

Branch: refs/heads/master
Commit: 38873aa0816d904c40d991939cef7b64c14ea4a3
Parents: 555e59f
Author: DO YUNG YOON <st...@apache.org>
Authored: Sat Oct 1 16:25:23 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Sat Oct 1 16:30:49 2016 +0900

----------------------------------------------------------------------
 CHANGES | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/38873aa0/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0a92f05..106bc9f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -79,6 +79,10 @@ Release 0.1.0 - unreleased
 
     S2GRAPH-82: Merge DeferCache and FutureCache (Committed by Daewon Jeong).
 
+    S2GRAPH-116: using ASM and ByteBuddy to add a proxy to Asynchbase's Scanner.
+		(Contributed by Jong Wook Kim<jo...@nyu.edu>, committed by DOYUNG YOON)
+
+
   BUG FIXES
 
     S2GRAPH-18: Query Option "interval" is Broken.