You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/10/01 07:32:15 UTC
[1/2] incubator-s2graph git commit: [S2GRAPH-116] using ASM and
ByteBuddy to add a proxy to Asynchbase's Scanner
Repository: incubator-s2graph
Updated Branches:
refs/heads/master c50d43a06 -> 38873aa08
[S2GRAPH-116] using ASM and ByteBuddy to add a proxy to Asynchbase's Scanner
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/555e59f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/555e59f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/555e59f9
Branch: refs/heads/master
Commit: 555e59f9882a24b906dcce5ddf8223629c0f0cc9
Parents: c50d43a
Author: Jong Wook Kim <jo...@nyu.edu>
Authored: Tue Sep 27 02:28:30 2016 -0400
Committer: Jong Wook Kim <jo...@nyu.edu>
Committed: Sat Oct 1 02:03:54 2016 -0400
----------------------------------------------------------------------
s2core/build.sbt | 3 +-
.../core/storage/hbase/AsynchbasePatcher.scala | 131 +++++++++++++++++++
.../core/storage/hbase/AsynchbaseStorage.scala | 19 +--
3 files changed, 144 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/555e59f9/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index bcaac44..a456ce0 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -40,7 +40,8 @@ libraryDependencies ++= Seq(
"com.github.danielwegener" % "logback-kafka-appender" % "0.0.4",
"com.stumbleupon" % "async" % "1.4.1",
"io.netty" % "netty" % "3.9.4.Final" force(),
- "org.hbase" % "asynchbase" % "1.7.2-S2GRAPH" from "https://github.com/SteamShon/asynchbase/raw/mvn-repo/org/hbase/asynchbase/1.7.2-S2GRAPH/asynchbase-1.7.2-S2GRAPH.jar"
+ "org.hbase" % "asynchbase" % "1.7.2",
+ "net.bytebuddy" % "byte-buddy" % "1.4.26"
)
libraryDependencies := {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/555e59f9/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbasePatcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbasePatcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbasePatcher.scala
new file mode 100644
index 0000000..3eaebfe
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbasePatcher.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import java.lang.Integer.valueOf
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.Callable
+
+import net.bytebuddy.ByteBuddy
+import net.bytebuddy.description.modifier.Visibility.PUBLIC
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy
+import net.bytebuddy.implementation.FieldAccessor
+import net.bytebuddy.implementation.MethodDelegation.to
+import net.bytebuddy.implementation.bind.annotation.{SuperCall, This}
+import net.bytebuddy.matcher.ElementMatchers._
+import org.apache.commons.io.IOUtils
+import org.hbase.async._
+import org.objectweb.asm.Opcodes.{ACC_FINAL, ACC_PRIVATE, ACC_PROTECTED, ACC_PUBLIC}
+import org.objectweb.asm._
+
+import scala.collection.JavaConversions._
+
+/**
+ * Upon initialization, it loads a patched version of Asynchbase's Scanner class,
+ * modified using ASM to make the classes non-final and their methods are all public,
+ * so that ByteBuddy can create subclasses of them.
+ *
+ * This object has to be initialized before any access to (i.e. any classloading of) Asynchbase,
+ * since the ClassLoader does not allow redefining already loaded classes unless we use instrumentation.
+ */
+object AsynchbasePatcher {
+
+ /** invoking this method will force the classloading of this object, thus triggering the patch mechanism below */
+ def init(): Unit = {
+ val methods = scannerClass.getMethods.map(_.getName)
+ assert(methods.contains("getRpcTimeout"))
+ assert(methods.contains("setRpcTimeout"))
+ }
+
+ /** instantiate a new Scanner, patched to support RPC timeout */
+ def newScanner(client: HBaseClient, table: Array[Byte]): ScannerExtra = {
+ val constructor = scannerClass.getConstructor(classOf[HBaseClient], BA)
+ constructor.setAccessible(true)
+ constructor.newInstance(client, table).asInstanceOf[ScannerExtra]
+ }
+
+ /** instantiate a new Scanner, patched to support RPC timeout */
+ def newScanner(client: HBaseClient, table: String): ScannerExtra = {
+ newScanner(client, table.getBytes(StandardCharsets.UTF_8))
+ }
+
+
+ trait RpcTimeout {
+ def getRpcTimeout: Int
+ def setRpcTimeout(timeout: Int): Unit
+ }
+
+ type ScannerExtra = Scanner with RpcTimeout
+
+ val interceptor = new Object() {
+ def getNextRowsRequest(@This scanner: ScannerExtra, @SuperCall getNextRowsRequest: Callable[HBaseRpc]): HBaseRpc = {
+ val request = getNextRowsRequest.call()
+ val rpcTimeout = scanner.getRpcTimeout
+ if (rpcTimeout > 0) {
+ request.setTimeout(rpcTimeout)
+ }
+ request
+ }
+ }
+
+ private val BA = classOf[Array[Byte]]
+ private val classLoader = getClass.getClassLoader
+ private val defineClass = classOf[ClassLoader].getDeclaredMethod("defineClass", classOf[String], BA, classOf[Int], classOf[Int])
+
+ /** a java.lang.Class instance for the patched Scanner class */
+ private val scannerClass = {
+ new ByteBuddy()
+ .subclass(loadClass("Scanner"))
+ .name("org.hbase.async.ScannerEx")
+ .implement(classOf[RpcTimeout]).intercept(FieldAccessor.ofBeanProperty())
+ .defineField("rpcTimeout", classOf[Int], PUBLIC)
+ .method(named("getNextRowsRequest")).intercept(to(interceptor))
+ .make.load(classLoader, ClassLoadingStrategy.Default.INJECTION).getLoaded
+ }
+
+ /** loads Asynchbase classes from s2core's classpath
+ * *MUST* be called before any access to those classes,
+ * otherwise the classloading will fail with an "attempted duplicate class definition" error.
+ **/
+ private def loadClass(name: String): Class[_] = {
+ classLoader.getResources(s"org/hbase/async/$name.class").toSeq.headOption match {
+ case Some(url) =>
+ val stream = url.openStream()
+ val bytes = try { IOUtils.toByteArray(stream) } finally { stream.close() }
+
+ // patch the bytecode so that the class is no longer final and the methods are all accessible
+ val cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES)
+ new ClassReader(bytes).accept(new ClassAdapter(cw) {
+ override def visit(version: Int, access: Int, name: String, signature: String, superName: String, interfaces: Array[String]): Unit = {
+ super.visit(version, access & ~ACC_FINAL, name, signature, superName, interfaces)
+ }
+ override def visitMethod(access: Int, name: String, desc: String, signature: String, exceptions: Array[String]): MethodVisitor = {
+ super.visitMethod(access & ~ACC_PRIVATE & ~ACC_PROTECTED & ~ACC_FINAL | ACC_PUBLIC, name, desc, signature, exceptions)
+ }
+ }, 0)
+ val patched = cw.toByteArray
+
+ defineClass.setAccessible(true)
+ defineClass.invoke(classLoader, s"org.hbase.async.$name", patched, valueOf(0), valueOf(patched.length)).asInstanceOf[Class[_]]
+ case None =>
+ throw new ClassNotFoundException(s"Could not find Asynchbase class: $name")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/555e59f9/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 2800f46..783fd8a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -37,6 +37,7 @@ import org.apache.s2graph.core.mysqls.LabelMeta
import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.types.{HBaseType, VertexId}
import org.apache.s2graph.core.utils._
+import org.hbase.async.FilterList.Operator.MUST_PASS_ALL
import org.hbase.async._
import scala.collection.JavaConversions._
@@ -51,6 +52,7 @@ object AsynchbaseStorage {
val edgeCf = Serializable.edgeCf
val emptyKVs = new util.ArrayList[KeyValue]()
+ AsynchbasePatcher.init()
def makeClient(config: Config, overrideKv: (String, String)*) = {
val asyncConfig: org.hbase.async.Config =
@@ -198,13 +200,13 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
label.schemaVersion match {
case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
- val scanner = client.newScanner(label.hbaseTableName.getBytes)
+ val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName)
scanner.setFamily(edgeCf)
/*
* TODO: remove this part.
*/
- val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption
+ val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq)
val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam"))
val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
@@ -246,19 +248,20 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
// SET option for this rpc properly.
scanner
case _ =>
- val get =
- if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, qualifier)
- else new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)
+ val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
+ new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, qualifier)
+ } else {
+ new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)
+ }
get.maxVersions(1)
get.setFailfast(true)
- get.setMaxResultsPerColumnFamily(queryParam.limit)
- get.setRowOffsetPerColumnFamily(queryParam.offset)
get.setMinTimestamp(minTs)
get.setMaxTimestamp(maxTs)
get.setTimeout(queryParam.rpcTimeoutInMillis)
- if (queryParam.columnRangeFilter != null) get.setFilter(queryParam.columnRangeFilter)
+ val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset)
+ get.setFilter(new FilterList(pagination +: Option(queryParam.columnRangeFilter).toSeq, MUST_PASS_ALL))
get
}
[2/2] incubator-s2graph git commit: [S2GRAPH-116] Use the official
Asynchbase release and stop managing our custom fork
Posted by st...@apache.org.
[S2GRAPH-116] Use the official Asynchbase release and stop managing our custom fork
JIRA:
[S2GRAPH-116] https://issues.apache.org/jira/browse/S2GRAPH-116
Pull Request:
Closes #85
Authors
Jong Wook Kim: jongwook@nyu.edu
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/38873aa0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/38873aa0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/38873aa0
Branch: refs/heads/master
Commit: 38873aa0816d904c40d991939cef7b64c14ea4a3
Parents: 555e59f
Author: DO YUNG YOON <st...@apache.org>
Authored: Sat Oct 1 16:25:23 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Sat Oct 1 16:30:49 2016 +0900
----------------------------------------------------------------------
CHANGES | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/38873aa0/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0a92f05..106bc9f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -79,6 +79,10 @@ Release 0.1.0 - unreleased
S2GRAPH-82: Merge DeferCache and FutureCache (Committed by Daewon Jeong).
+ S2GRAPH-116: using ASM and ByteBuddy to add a proxy to Asynchbase's Scanner.
+ (Contributed by Jong Wook Kim<jo...@nyu.edu>, committed by DOYUNG YOON)
+
+
BUG FIXES
S2GRAPH-18: Query Option "interval" is Broken.