You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/09/12 14:37:50 UTC
incubator-s2graph git commit: [S2GRAPH-106]: Remove warnings while
package.
Repository: incubator-s2graph
Updated Branches:
refs/heads/master 955d85e43 -> 224c7370a
[S2GRAPH-106]: Remove warnings while package.
JIRA:
[S2GRAPH-106] https://issues.apache.org/jira/browse/S2GRAPH-106
Pull Request:
Closes #77
Authors
DO YUNG YOON: steamshon@apache.org
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/224c7370
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/224c7370
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/224c7370
Branch: refs/heads/master
Commit: 224c7370a1e15da5e0b66b8e2c8bf1db2a62924f
Parents: 955d85e
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Sep 12 23:40:12 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Sep 12 23:40:12 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
build.sbt | 6 ++-
loader/build.sbt | 6 ++-
.../s2graph/loader/spark/HBaseContext.scala | 10 ++--
.../s2graph/loader/spark/JavaHBaseContext.scala | 10 ++--
.../loader/subscriber/TransferToHFile.scala | 13 +++--
.../loader/subscriber/WalLogToHDFS.scala | 2 +-
s2core/build.sbt | 12 ++---
.../scala/org/apache/s2graph/core/Edge.scala | 10 ++--
.../apache/s2graph/core/ExceptionHandler.scala | 2 +-
.../scala/org/apache/s2graph/core/Graph.scala | 10 ++--
.../org/apache/s2graph/core/Management.scala | 6 +--
.../org/apache/s2graph/core/PostProcess.scala | 4 +-
.../org/apache/s2graph/core/QueryParam.scala | 2 +-
.../org/apache/s2graph/core/mysqls/Bucket.scala | 2 +-
.../org/apache/s2graph/core/mysqls/Label.scala | 4 +-
.../s2graph/core/rest/RequestParser.scala | 2 +-
.../apache/s2graph/core/storage/SKeyValue.scala | 2 +-
.../apache/s2graph/core/storage/Storage.scala | 34 ++++++------
.../core/storage/hbase/AsynchbaseStorage.scala | 4 +-
.../tall/IndexEdgeDeserializable.scala | 6 +--
.../indexedge/tall/IndexEdgeSerializable.scala | 2 +-
.../wide/IndexEdgeDeserializable.scala | 6 +--
.../tall/SnapshotEdgeDeserializable.scala | 2 +-
.../apache/s2graph/core/types/VertexId.scala | 6 +--
.../apache/s2graph/core/types/v2/InnerVal.scala | 4 +-
.../counter/MethodNotSupportedException.scala | 3 ++
.../s2graph/counter/helper/WithHBase.scala | 2 +-
.../org/apache/s2graph/counter/package.scala | 2 -
s2counter_loader/build.sbt | 2 +
.../counter/loader/CounterBulkLoader.scala | 2 +-
.../loader/core/CounterEtlFunctions.scala | 2 +-
.../counter/loader/core/CounterFunctions.scala | 10 ++--
.../counter/loader/stream/EtlStreaming.scala | 8 +--
.../loader/stream/GraphToETLStreaming.scala | 2 +-
.../org/apache/s2graph/rest/netty/Server.scala | 55 ++++++++++----------
.../apache/s2graph/rest/play/Bootstrap.scala | 2 +-
.../play/controllers/CounterController.scala | 4 +-
.../rest/play/controllers/EdgeController.scala | 2 +-
39 files changed, 138 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 353b716..91cc36f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -181,6 +181,8 @@ Release 0.12.1 - unreleased
S2GRAPH-104: force scalaz-stream, netty-http-pipelining dependencies version which is available on maven central (Committed by DOYUNG YOON).
+ S2GRAPH-106: Remove warnings while package (Committed by DOYUNG YOON).
+
TEST
S2GRAPH-21: Change PostProcessBenchmarkSpec not to store and fetch test data from storage. (Committed by DOYUNG YOON).
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index 7a2511d..b74e191 100755
--- a/build.sbt
+++ b/build.sbt
@@ -26,7 +26,7 @@ lazy val commonSettings = Seq(
organization := "org.apache.s2graph",
scalaVersion := "2.11.7",
version := "0.1.0",
- scalacOptions := Seq("-language:postfixOps", "-unchecked", "-deprecation", "-feature", "-Xlint"),
+ scalacOptions := Seq("-language:postfixOps", "-unchecked", "-deprecation", "-feature", "-Xlint", "-Xlint:-missing-interpolator"),
javaOptions ++= collection.JavaConversions.propertiesAsScalaMap(System.getProperties).map { case (key, value) => "-D" + key + "=" + value }.toSeq,
testOptions in Test += Tests.Argument("-oDF"),
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
@@ -91,4 +91,6 @@ releaseProcess := Seq[ReleaseStep](
commitNextVersion
)
-releasePublishArtifactsAction := PgpKeys.publishSigned.value
\ No newline at end of file
+releasePublishArtifactsAction := PgpKeys.publishSigned.value
+
+mainClass in (Compile) := None
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/loader/build.sbt
----------------------------------------------------------------------
diff --git a/loader/build.sbt b/loader/build.sbt
index b74f305..75e268e 100644
--- a/loader/build.sbt
+++ b/loader/build.sbt
@@ -24,7 +24,7 @@ name := "s2loader"
scalacOptions ++= Seq("-deprecation")
projectDependencies := Seq(
- (projectID in "s2core").value exclude("org.mortbay.jetty", "*") exclude("javax.xml.stream", "*") exclude("javax.servlet", "*")
+ (projectID in "s2core").value exclude("org.mortbay.jetty", "j*") exclude("javax.xml.stream", "s*") exclude("javax.servlet", "s*") exclude("javax.servlet", "j*")
)
libraryDependencies ++= Seq(
@@ -55,4 +55,6 @@ excludedJars in assembly := {
test in assembly := {}
-parallelExecution in Test := false
\ No newline at end of file
+parallelExecution in Test := false
+
+mainClass in (Compile) := None
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala
index 94e12af..1f68dc2 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala
@@ -55,8 +55,8 @@ import scala.collection.mutable
* of disseminating the configuration information
* to the working and managing the life cycle of HConnections.
*/
-class HBaseContext(@transient sc: SparkContext,
- @transient config: Configuration,
+class HBaseContext(@transient private val sc: SparkContext,
+ @transient private val config: Configuration,
val tmpHdfsConfgFile: String = null)
extends Serializable with Logging {
@@ -632,7 +632,7 @@ class HBaseContext(@transient sc: SparkContext,
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
}
- /**
+ /*
* This will return a new HFile writer when requested
*
* @param family column family
@@ -699,7 +699,7 @@ class HBaseContext(@transient sc: SparkContext,
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
var rollOverRequested = false
- /**
+ /*
* This will roll all writers
*/
def rollWriters(): Unit = {
@@ -714,7 +714,7 @@ class HBaseContext(@transient sc: SparkContext,
rollOverRequested = false
}
- /**
+ /*
* This function will close a given HFile writer
* @param w The writer to close
*/
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/loader/src/main/scala/org/apache/s2graph/loader/spark/JavaHBaseContext.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/spark/JavaHBaseContext.scala b/loader/src/main/scala/org/apache/s2graph/loader/spark/JavaHBaseContext.scala
index edfc635..81336d1 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/spark/JavaHBaseContext.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/JavaHBaseContext.scala
@@ -36,8 +36,8 @@ import scala.reflect.ClassTag
* @param jsc This is the JavaSparkContext that we will wrap
* @param config This is the config information to out HBase cluster
*/
-class JavaHBaseContext(@transient jsc: JavaSparkContext,
- @transient config: Configuration) extends Serializable {
+class JavaHBaseContext(@transient private val jsc: JavaSparkContext,
+ @transient private val config: Configuration) extends Serializable {
val hbaseContext = new HBaseContext(jsc.sc, config)
/**
@@ -78,7 +78,7 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
def foreachPartition[T](javaDstream: JavaDStream[T],
f: VoidFunction[(Iterator[T], Connection)]) = {
hbaseContext.foreachPartition(javaDstream.dstream,
- (it: Iterator[T], conn: Connection) => f.call(it, conn))
+ (it: Iterator[T], conn: Connection) => f.call((it, conn)))
}
/**
@@ -138,7 +138,7 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
JavaDStream[U] = {
JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream,
(it: Iterator[T], conn: Connection) =>
- mp.call(it, conn))(fakeClassTag[U]))(fakeClassTag[U])
+ mp.call((it, conn)))(fakeClassTag[U]))(fakeClassTag[U])
}
/**
@@ -307,7 +307,7 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
hbaseContext.hbaseRDD[U](tableName,
scans,
(v: (ImmutableBytesWritable, Result)) =>
- f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U])
+ f.call((v._1, v._2)))(fakeClassTag[U]))(fakeClassTag[U])
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index 06079a7..f010328 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -65,10 +65,13 @@ object TransferToHFile extends SparkApp with JSONParser {
}
def buildDegrees(msgs: RDD[String], labelMapping: Map[String, String], edgeAutoCreate: Boolean) = {
+ val filtered = msgs.filter { case msg =>
+ val tokens = GraphUtil.split(msg)
+ tokens(2) == "e" || tokens(2) == "edge"
+ }
for {
- msg <- msgs
+ msg <- filtered
tokens = GraphUtil.split(msg)
- if tokens(2) == "e" || tokens(2) == "edge"
tempDirection = if (tokens.length == 7) "out" else tokens(7)
direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
reverseDirection = if (direction == "out") "in" else "out"
@@ -157,7 +160,7 @@ object TransferToHFile extends SparkApp with JSONParser {
GraphSubscriberHelper.management.createTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm)
- /** set up hbase init */
+ /* set up hbase init */
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", zkQuorum)
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
@@ -199,8 +202,8 @@ object TransferToHFile extends SparkApp with JSONParser {
val hbaseSc = new HBaseContext(sc, hbaseConf)
def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
- val k = new KeyFamilyQualifier(kv.getRow(), kv.getFamily(), kv.getQualifier())
- val v = kv.getValue()
+ val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv))
+ val v = CellUtil.cloneValue(kv)
Seq((k -> v)).toIterator
}
val familyOptions = new FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase,
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
index 348c81d..a8fc4df 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
@@ -117,7 +117,7 @@ object WalLogToHDFS extends SparkApp with WithKafka {
val ts = time.milliseconds
val dateId = new SimpleDateFormat("yyyy-MM-dd").format(new Date(ts))
- /** make sure that `elements` are not running at the same time */
+ /* make sure that `elements` are not running at the same time */
val elementsWritten = {
elements.cache()
(Array("all") ++ splits).foreach {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index d8e510c..bcaac44 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -27,12 +27,12 @@ libraryDependencies ++= Seq(
"com.typesafe.play" %% "play-json" % Common.playVersion,
"com.typesafe.akka" %% "akka-actor" % "2.3.4",
"com.google.guava" % "guava" % "12.0.1" force(), // use this old version of guava to avoid incompatibility
- "org.apache.hbase" % "hbase-client" % Common.hbaseVersion exclude("org.slf4j", "*"),
- "org.apache.hbase" % "hbase-common" % Common.hbaseVersion exclude("org.slf4j", "*"),
- "org.apache.hbase" % "hbase-server" % Common.hbaseVersion exclude("org.slf4j", "*") exclude("com.google.protobuf", "*"),
- "org.apache.hbase" % "hbase-hadoop-compat" % Common.hbaseVersion exclude("org.slf4j", "*"),
- "org.apache.hbase" % "hbase-hadoop2-compat" % Common.hbaseVersion exclude("org.slf4j", "*"),
- "org.apache.kafka" % "kafka-clients" % "0.8.2.0" exclude("org.slf4j", "*") exclude("com.sun.jdmk", "*") exclude("com.sun.jmx", "*") exclude("javax.jms", "*"),
+ "org.apache.hbase" % "hbase-client" % Common.hbaseVersion exclude("org.slf4j", "slf4j*"),
+ "org.apache.hbase" % "hbase-common" % Common.hbaseVersion exclude("org.slf4j", "slf4j*"),
+ "org.apache.hbase" % "hbase-server" % Common.hbaseVersion exclude("org.slf4j", "slf4j*") exclude("com.google.protobuf", "protobuf*"),
+ "org.apache.hbase" % "hbase-hadoop-compat" % Common.hbaseVersion exclude("org.slf4j", "slf4j*"),
+ "org.apache.hbase" % "hbase-hadoop2-compat" % Common.hbaseVersion exclude("org.slf4j", "slf4j*"),
+ "org.apache.kafka" % "kafka-clients" % "0.8.2.0" exclude("org.slf4j", "slf4j*") exclude("com.sun.jdmk", "j*") exclude("com.sun.jmx", "j*") exclude("javax.jms", "j*"),
"commons-pool" % "commons-pool" % "1.6",
"org.scalatest" %% "scalatest" % "2.2.4" % "test",
"org.scalikejdbc" %% "scalikejdbc" % "2.1.+",
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 979268b..0fab400 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -91,7 +91,7 @@ case class IndexEdge(srcVertex: Vertex,
props.get(k) match {
case None =>
- /**
+ /*
* TODO: agly hack
* now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once
*/
@@ -283,9 +283,9 @@ case class Edge(srcVertex: Vertex,
def toLogString: String = {
val ret =
if (propsWithName.nonEmpty)
- List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName))
+ List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)).map(_.toString)
else
- List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label)
+ List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label).map(_.toString)
ret.mkString("\t")
}
@@ -371,12 +371,12 @@ object Edge extends JSONParser {
for {
(requestEdge, func) <- requestWithFuncs
} {
- val (_newPropsWithTs, _) = func(prevPropsWithTs, requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer)
+ val (_newPropsWithTs, _) = func((prevPropsWithTs, requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer))
prevPropsWithTs = _newPropsWithTs
// logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
}
val requestTs = requestEdge.ts
- /** version should be monotoniously increasing so our RPC mutation should be applied safely */
+ /* version should be monotoniously increasing so our RPC mutation should be applied safely */
val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs)
val maxTs = prevPropsWithTs.map(_._2.ts).max
val newTs = if (maxTs > requestTs) maxTs else requestTs
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
index 0af9ce5..29fc2dd 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
@@ -85,7 +85,7 @@ object ExceptionHandler {
def kafkaConfig(config: Config) = {
val props = new Properties();
- /** all default configuration for new producer */
+ /* all default configuration for new producer */
val brokers =
if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list")
else "localhost"
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index e5aa6eb..c25b71a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -103,7 +103,7 @@ object Graph {
}
def processTimeDecay(queryParam: QueryParam, edge: Edge) = {
- /** process time decay */
+ /* process time decay */
val tsVal = queryParam.timeDecay match {
case None => 1.0
case Some(timeDecay) =>
@@ -141,7 +141,7 @@ object Graph {
queryParam: QueryParam,
convertedEdge: Edge) = {
- /** skip duplicate policy check if consistencyLevel is strong */
+ /* skip duplicate policy check if consistencyLevel is strong */
if (queryParam.label.consistencyLevel != "strong" && resultEdges.containsKey(hashKey)) {
val (oldFilterHashKey, oldEdge, oldScore) = resultEdges.get(hashKey)
//TODO:
@@ -245,7 +245,7 @@ object Graph {
val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
- /** check if this edge should be exlcuded. */
+ /* check if this edge should be exlcuded. */
if (shouldBeExcluded && !isDegree) {
edgesToExclude.add(filterHashKey)
} else {
@@ -260,7 +260,7 @@ object Graph {
convertEdges(queryParam, edge, nextStepOpt).foreach { convertedEdge =>
val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
- /** check if this edge should be exlcuded. */
+ /* check if this edge should be exlcuded. */
if (shouldBeExcluded && !isDegree) {
edgesToExclude.add(filterHashKey)
} else {
@@ -294,7 +294,7 @@ object Graph {
val parts = GraphUtil.split(s)
val logType = parts(2)
val element = if (logType == "edge" | logType == "e") {
- /** current only edge is considered to be bulk loaded */
+ /* current only edge is considered to be bulk loaded */
labelMapping.get(parts(5)) match {
case None =>
case Some(toReplace) =>
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index f9b7431..2c77d4b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -308,7 +308,7 @@ class Management(graph: Graph) {
Model withTx { implicit session =>
val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
- /** create hbase table for service */
+ /* create hbase table for service */
storage.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm)
service
}
@@ -340,14 +340,14 @@ class Management(graph: Graph) {
case Some(l) =>
throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
case None =>
- /** create all models */
+ /* create all models */
val newLabel = Label.insertAll(label,
srcServiceName, srcColumnName, srcColumnType,
tgtServiceName, tgtColumnName, tgtColumnType,
isDirected, serviceName, indices, props, consistencyLevel,
hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm)
- /** create hbase table */
+ /* create hbase table */
val service = newLabel.service
(hTableName, hTableTTL) match {
case (None, None) => // do nothing
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 5bfaad7..caaa408 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -241,7 +241,7 @@ object PostProcess extends JSONParser {
}
}
- /** build result jsons */
+ /* build result jsons */
for {
queryRequestWithResult <- queryRequestWithResultLs
(queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
@@ -521,7 +521,6 @@ object PostProcess extends JSONParser {
props.toMap
}
- @deprecated(message = "deprecated", since = "0.2")
def propsToJson(edge: Edge) = {
for {
(seq, v) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
@@ -532,7 +531,6 @@ object PostProcess extends JSONParser {
}
}
- @deprecated(message = "deprecated", since = "0.2")
def summarizeWithListExclude(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsObject = {
val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index a5d7517..28d78a9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -381,7 +381,7 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System
}
def limit(offset: Int, limit: Int): QueryParam = {
- /** since degree info is located on first always */
+ /* since degree info is located on first always */
if (offset == 0 && this.columnRangeFilter == null) {
this.limit = limit + 1
this.offset = offset
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
index 7626ead..bcd5f0a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
@@ -53,7 +53,7 @@ object Bucket extends Model[Bucket] {
def toRange(str: String): Option[(Int, Int)] = {
val range = str.split(rangeDelimiter)
- if (range.length == 2) Option(range.head.toInt, range.last.toInt)
+ if (range.length == 2) Option((range.head.toInt, range.last.toInt))
else None
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 6e17793..90d3d67 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -174,14 +174,14 @@ object Label extends Model[Label] {
val tgtServiceId = tgtService.id.get
val serviceId = service.id.get
- /** insert serviceColumn */
+ /* insert serviceColumn */
val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType), schemaVersion)
val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType), schemaVersion)
if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}")
if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}")
- /** create label */
+ /* create label */
Label.findByName(labelName, useCache = false).getOrElse {
val createdId = insert(labelName, srcServiceId, srcColumnName, srcColumnType,
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index afda6f9..61db2ef 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -263,7 +263,7 @@ class RequestParser(config: Config) extends JSONParser {
for {
idVal <- idOpt ++ idsOpt.toSeq.flatten
- /** bug, need to use labels schemaVersion */
+ /* bug, need to use labels schemaVersion */
innerVal <- jsValueToInnerVal(idVal, col.columnType, col.schemaVersion)
} yield {
Vertex(SourceVertexId(col.id.get, innerVal), System.currentTimeMillis())
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
index b690307..c76c25c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
@@ -39,7 +39,7 @@ case class SKeyValue(table: Array[Byte],
def toLogString = {
Map("table" -> table.toList, "row" -> row.toList, "cf" -> Bytes.toString(cf),
"qualifier" -> qualifier.toList, "value" -> value.toList, "timestamp" -> timestamp,
- "operation" -> operation).toString
+ "operation" -> operation).mapValues(_.toString).toString
}
override def toString(): String = toLogString
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index e52c579..70e47a7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -442,7 +442,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
future recoverWith {
case FetchTimeoutException(retryEdge) =>
logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
- /** fetch failed. re-fetch should be done */
+ /* fetch failed. re-fetch should be done */
fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
@@ -458,14 +458,14 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
- /** retry logic */
+ /* retry logic */
val promise = Promise[Boolean]
val backOff = exponentialBackOff(tryNum)
scheduledThreadPool.schedule(new Runnable {
override def run(): Unit = {
val future = if (failedStatusCode == 0) {
// acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
- /** fetch failed. re-fetch should be done */
+ /* fetch failed. re-fetch should be done */
fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
@@ -498,7 +498,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
case 0 =>
fetchedSnapshotEdgeOpt match {
case None =>
- /**
+ /*
* no one has never mutated this SN.
* (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges)
* pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1)
@@ -520,7 +520,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
case Some(snapshotEdge) =>
snapshotEdge.pendingEdgeOpt match {
case None =>
- /**
+ /*
* others finished commit on this SN. but there is no contention.
* (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges)
* pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ?
@@ -542,7 +542,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
case Some(pendingEdge) =>
val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis()
if (isLockExpired) {
- /**
+ /*
* if pendingEdge.ts == snapshotEdge.ts =>
* (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge))
* else =>
@@ -564,7 +564,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
} else {
- /**
+ /*
* others finished commit on this SN and there is currently contention.
* this can't be proceed so retry from re-fetch.
* throw EX
@@ -577,11 +577,11 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
case _ =>
- /**
+ /*
* statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock
*/
- /**
+ /*
* this succeed to lock this SN. keep doing on commit process.
* if SN.isEmpty =>
* no one never succed to commit on this SN.
@@ -831,7 +831,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
edgeWithScore <- queryResult.edgeWithScoreLs
(edge, score) = EdgeWithScore.unapply(edgeWithScore).get
} yield {
- /** reverted direction */
+ /* reverted direction */
val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
buildIncrementsAsync(indexEdge, -1L)
@@ -890,7 +890,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
label.schemaVersion match {
case HBaseType.VERSION3 | HBaseType.VERSION4 =>
if (label.consistencyLevel == "strong") {
- /**
+ /*
* read: snapshotEdge on queryResult = O(N)
* write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
*/
@@ -900,7 +900,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
}
case _ =>
- /**
+ /*
* read: x
* write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
*/
@@ -1090,7 +1090,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match {
case Some(tgtVertexId) => // _to is given.
- /** we use toSnapshotEdge so dont need to swap src, tgt */
+ /* we use toSnapshotEdge so dont need to swap src, tgt */
val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion)
val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, label.schemaVersion)
(src, tgt)
@@ -1285,19 +1285,19 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
(edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match {
case (true, true) =>
- /** when there is no need to update. shouldUpdate == false */
+ /* when there is no need to update. shouldUpdate == false */
List.empty
case (true, false) =>
- /** no edges to delete but there is new edges to insert so increase degree by 1 */
+ /* no edges to delete but there is new edges to insert so increase degree by 1 */
edgeMutate.edgesToInsert.flatMap { e => buildIncrementsAsync(e) }
case (false, true) =>
- /** no edges to insert but there is old edges to delete so decrease degree by 1 */
+ /* no edges to insert but there is old edges to delete so decrease degree by 1 */
edgeMutate.edgesToDelete.flatMap { e => buildIncrementsAsync(e, -1L) }
case (false, false) =>
- /** update on existing edges so no change on degree */
+ /* update on existing edges so no change on degree */
List.empty
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 19d5cc8..2800f46 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -201,7 +201,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
val scanner = client.newScanner(label.hbaseTableName.getBytes)
scanner.setFamily(edgeCf)
- /**
+ /*
* TODO: remove this part.
*/
val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption
@@ -221,7 +221,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
}
(_startKey, Bytes.add(baseKey, queryParam.columnRangeFilterMaxBytes))
} else {
- /**
+ /*
* note: since propsToBytes encode size of property map at first byte, we are sure about max value here
*/
val _startKey = queryParam.cursorOpt match {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index e6265f7..e2b7c2f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -121,7 +121,7 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
- /** process indexProps */
+ /* process indexProps */
for {
(seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
} {
@@ -129,7 +129,7 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
else allProps += seq -> InnerValLikeWithTs(v, version)
}
- /** process props */
+ /* process props */
if (op == GraphUtil.operations("incrementCount")) {
// val countVal = Bytes.toLong(kv.value)
val countVal = bytesToLongFunc(kv.value, 0)
@@ -145,7 +145,7 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
- /** process tgtVertexId */
+ /* process tgtVertexId */
val tgtVertexId =
mergedProps.get(LabelMeta.toSeq) match {
case None => tgtVertexIdRaw
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
index f17e41c..a76bd1f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
@@ -51,7 +51,7 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge
case Some(vId) => idxPropsBytes
}
- /** TODO search usage of op byte. if there is no, then remove opByte */
+ /* TODO search usage of op byte. if there is no, then remove opByte */
Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index 5a8fa42..eb3d765 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -101,7 +101,7 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
- /** process indexProps */
+ /* process indexProps */
for {
(seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
} {
@@ -109,7 +109,7 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
else allProps += seq -> InnerValLikeWithTs(v, version)
}
- /** process props */
+ /* process props */
if (op == GraphUtil.operations("incrementCount")) {
// val countVal = Bytes.toLong(kv.value)
val countVal = bytesToLongFunc(kv.value, 0)
@@ -127,7 +127,7 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte
if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
- /** process tgtVertexId */
+ /* process tgtVertexId */
val tgtVertexId =
mergedProps.get(LabelMeta.toSeq) match {
case None => tgtVertexIdRaw
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index 0380ec9..368e3f3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -44,7 +44,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
val kv = kvs.head
val schemaVer = queryParam.label.schemaVersion
val cellVersion = kv.timestamp
- /** rowKey */
+ /* rowKey */
def parseRowV3(kv: SKeyValue, version: String) = {
var pos = 0
val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
index 9fe57ea..24b30fb 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
@@ -29,7 +29,7 @@ object VertexId extends HBaseDeserializable {
offset: Int,
len: Int,
version: String = DEFAULT_VERSION): (VertexId, Int) = {
- /** since murmur hash is prepended, skip numOfBytes for murmur hash */
+ /* since murmur hash is prepended, skip numOfBytes for murmur hash */
var pos = offset + GraphUtil.bytesForMurMurHash
val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, version, isVertexId = true)
@@ -101,7 +101,7 @@ object SourceVertexId extends HBaseDeserializable {
offset: Int,
len: Int,
version: String = DEFAULT_VERSION): (VertexId, Int) = {
- /** since murmur hash is prepended, skip numOfBytes for murmur hash */
+ /* since murmur hash is prepended, skip numOfBytes for murmur hash */
val pos = offset + GraphUtil.bytesForMurMurHash
val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, version, isVertexId = true)
@@ -122,7 +122,7 @@ object TargetVertexId extends HBaseDeserializable {
offset: Int,
len: Int,
version: String = DEFAULT_VERSION): (VertexId, Int) = {
- /** murmur has is not prepended so start from offset */
+ /* murmur has is not prepended so start from offset */
val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, offset, len, version, isVertexId = true)
(TargetVertexId(DEFAULT_COL_ID, innerId), numOfBytesUsed)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2core/src/main/scala/org/apache/s2graph/core/types/v2/InnerVal.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/v2/InnerVal.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/v2/InnerVal.scala
index 7110681..9d9bdf2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/v2/InnerVal.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/v2/InnerVal.scala
@@ -38,7 +38,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
pbr.setPosition(offset)
val startPos = pbr.getPosition
if (bytes(offset) == -1 | bytes(offset) == 0) {
- /** simple boolean */
+ /* simple boolean */
val boolean = order match {
case Order.DESCENDING => bytes(offset) == 0
case _ => bytes(offset) == -1
@@ -73,7 +73,7 @@ case class InnerVal(value: Any) extends HBaseSerializable with InnerValLike {
val ret = value match {
case b: Boolean =>
- /** since OrderedBytes header start from 0x05, it is safe to use -1, 0
+ /* since OrderedBytes header start from 0x05, it is safe to use -1, 0
* for decreasing order (true, false) */
// Bytes.toBytes(b)
order match {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2counter_core/src/main/scala/org/apache/s2graph/counter/MethodNotSupportedException.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/MethodNotSupportedException.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/MethodNotSupportedException.scala
new file mode 100644
index 0000000..16f2dd4
--- /dev/null
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/MethodNotSupportedException.scala
@@ -0,0 +1,3 @@
+package org.apache.s2graph.counter
+
+case class MethodNotSupportedException(message: String, cause: Throwable = null) extends Exception(message, cause)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithHBase.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithHBase.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithHBase.scala
index c83854e..a2031d5 100644
--- a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithHBase.scala
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithHBase.scala
@@ -70,7 +70,7 @@ class WithHBase(config: Config) {
Try {
val table = conn.getTable(TableName.valueOf(tableName))
// do not keep failed operation in writer buffer
- table.setWriteBufferSize(writeBufferSize)
+// table.setWriteBufferSize(writeBufferSize)
try {
op(table)
} catch {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2counter_core/src/main/scala/org/apache/s2graph/counter/package.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/package.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/package.scala
index d69e1b9..996e8ad 100644
--- a/s2counter_core/src/main/scala/org/apache/s2graph/counter/package.scala
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/package.scala
@@ -22,6 +22,4 @@ package org.apache.s2graph
package object counter {
val VERSION_1: Byte = 1
val VERSION_2: Byte = 2
-
- case class MethodNotSupportedException(message: String, cause: Throwable = null) extends Exception(message, cause)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2counter_loader/build.sbt
----------------------------------------------------------------------
diff --git a/s2counter_loader/build.sbt b/s2counter_loader/build.sbt
index d7a6123..8de6281 100644
--- a/s2counter_loader/build.sbt
+++ b/s2counter_loader/build.sbt
@@ -52,3 +52,5 @@ mergeStrategy in assembly := {
}
test in assembly := {}
+
+mainClass in (Compile) := None
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/CounterBulkLoader.scala
----------------------------------------------------------------------
diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/CounterBulkLoader.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/CounterBulkLoader.scala
index a2d17a6..b24db15 100644
--- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/CounterBulkLoader.scala
+++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/CounterBulkLoader.scala
@@ -70,7 +70,7 @@ object CounterBulkLoader extends SparkApp with WithKafka {
sp = GraphUtil.split(line) if sp.size <= 7 || GraphUtil.split(line)(7) != "in"
item <- CounterEtlFunctions.parseEdgeFormat(line)
} yield {
- acc +=("Edges", 1)
+ acc += (("Edges", 1))
item
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
----------------------------------------------------------------------
diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
index cebdfdc..7dcf48a 100644
--- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
+++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
@@ -45,7 +45,7 @@ object CounterEtlFunctions extends Logging {
}
def parseEdgeFormat(line: String): Option[CounterEtlItem] = {
- /**
+ /*
* 1427082276804 insert edge 19073318 52453027_93524145648511699 story_user_ch_doc_view {"doc_type" : "l", "channel_subscribing" : "y", "view_from" : "feed"}
*/
for {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala
----------------------------------------------------------------------
diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala
index 882247d..05423b1 100644
--- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala
+++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala
@@ -389,8 +389,8 @@ object CounterFunctions extends Logging with WithKafka {
success <- exactCounter.insertBlobValue(policy, keys)
} yield {
success match {
- case true => acc += ("BLOB", 1)
- case false => acc += ("BLOBFailed", 1)
+ case true => acc += (("BLOB", 1))
+ case false => acc += (("BLOBFailed", 1))
}
}
}
@@ -411,8 +411,8 @@ object CounterFunctions extends Logging with WithKafka {
trxLog <- exactCounter.updateCount(policy, counts)
} yield {
trxLog.success match {
- case true => acc += (s"ExactV${policy.version}", 1)
- case false => acc += (s"ExactFailedV${policy.version}", 1)
+ case true => acc += ((s"ExactV${policy.version}", 1))
+ case false => acc += ((s"ExactFailedV${policy.version}", 1))
}
trxLog
}
@@ -446,7 +446,7 @@ object CounterFunctions extends Logging with WithKafka {
groupedValues <- allValues.grouped(10)
} {
rankingCounter.update(groupedValues, K_MAX)
- acc += (s"RankingV${policy.version}", groupedValues.length)
+ acc += ((s"RankingV${policy.version}", groupedValues.length))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala
----------------------------------------------------------------------
diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala
index 20c0a70..49ce2e3 100644
--- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala
+++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala
@@ -72,7 +72,7 @@ object EtlStreaming extends SparkApp with WithKafka {
val acc = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _))
- /**
+ /*
* read message from etl topic and join user profile from graph and then produce whole message to counter topic
*/
val stream = streamHelper.createStream[String, String, StringDecoder, StringDecoder](ssc, inputTopics)
@@ -89,7 +89,7 @@ object EtlStreaming extends SparkApp with WithKafka {
line <- GraphUtil.parseString(v)
item <- CounterEtlFunctions.parseEdgeFormat(line)
} yield {
- acc += ("Edges", 1)
+ acc += (("Edges", 1))
item
}
}
@@ -105,7 +105,7 @@ object EtlStreaming extends SparkApp with WithKafka {
val m = MutableHashMap.empty[Int, mutable.MutableList[CounterEtlItem]]
joinItems.foreach { item =>
if (item.useProfile) {
- acc += ("ETL", 1)
+ acc += (("ETL", 1))
}
val k = getPartKey(item.item, 20)
val values: mutable.MutableList[CounterEtlItem] = m.getOrElse(k, mutable.MutableList.empty[CounterEtlItem])
@@ -114,7 +114,7 @@ object EtlStreaming extends SparkApp with WithKafka {
}
m.foreach { case (k, v) =>
v.map(_.toKafkaMessage).grouped(1000).foreach { grouped =>
- acc += ("Produce", grouped.size)
+ acc += (("Produce", grouped.size))
producer.send(new KeyedMessage[String, String](StreamingConfig.KAFKA_TOPIC_COUNTER, null, k, grouped.mkString("\n")))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/GraphToETLStreaming.scala
----------------------------------------------------------------------
diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/GraphToETLStreaming.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/GraphToETLStreaming.scala
index c53391e..e0c0419 100644
--- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/GraphToETLStreaming.scala
+++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/GraphToETLStreaming.scala
@@ -56,7 +56,7 @@ object GraphToETLStreaming extends SparkApp with WithKafka {
val acc = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _))
- /**
+ /*
* consume graphIn topic and produce messages to etl topic
* two purpose
* 1. partition by target vertex id
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
----------------------------------------------------------------------
diff --git a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
index e980fc5..6ff050f 100644
--- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
+++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
@@ -46,6 +46,7 @@ import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.io.Source
import scala.util.{Failure, Success, Try}
+import scala.language.existentials
class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends SimpleChannelInboundHandler[FullHttpRequest] with JSONParser {
val ApplicationJson = "application/json"
@@ -169,9 +170,7 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends
}
// Simple http server
-object NettyServer extends App {
- /** should be same with Boostrap.onStart on play */
-
+object NettyServer {
val numOfThread = Runtime.getRuntime.availableProcessors()
val threadPool = Executors.newFixedThreadPool(numOfThread)
val ec = ExecutionContext.fromExecutor(threadPool)
@@ -198,30 +197,32 @@ object NettyServer extends App {
(new NioEventLoopGroup(1), new NioEventLoopGroup(), classOf[NioServerSocketChannel])
}
- try {
- val b: ServerBootstrap = new ServerBootstrap()
- .option(ChannelOption.SO_BACKLOG, Int.box(2048))
-
- b.group(bossGroup, workerGroup).channel(channelClass)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer[SocketChannel] {
- override def initChannel(ch: SocketChannel) {
- val p = ch.pipeline()
- p.addLast(new HttpServerCodec())
- p.addLast(new HttpObjectAggregator(65536))
- p.addLast(new S2RestHandler(rest)(ec))
- }
- })
-
- // for check server is started
- logger.info(s"Listening for HTTP on /0.0.0.0:$port")
- val ch: Channel = b.bind(port).sync().channel()
- ch.closeFuture().sync()
-
- } finally {
- bossGroup.shutdownGracefully()
- workerGroup.shutdownGracefully()
- s2graph.shutdown()
+ def main(args : scala.Array[scala.Predef.String]) : scala.Unit = {
+ try {
+ val b: ServerBootstrap = new ServerBootstrap()
+ .option(ChannelOption.SO_BACKLOG, Int.box(2048))
+
+ b.group(bossGroup, workerGroup).channel(channelClass)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new ChannelInitializer[SocketChannel] {
+ override def initChannel(ch: SocketChannel) {
+ val p = ch.pipeline()
+ p.addLast(new HttpServerCodec())
+ p.addLast(new HttpObjectAggregator(65536))
+ p.addLast(new S2RestHandler(rest)(ec))
+ }
+ })
+
+ // for check server is started
+ logger.info(s"Listening for HTTP on /0.0.0.0:$port")
+ val ch: Channel = b.bind(port).sync().channel()
+ ch.closeFuture().sync()
+
+ } finally {
+ bossGroup.shutdownGracefully()
+ workerGroup.shutdownGracefully()
+ s2graph.shutdown()
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
index 686f6e9..570c23c 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
@@ -77,7 +77,7 @@ object Global extends WithFilters(new GzipFilter()) {
ExceptionHandler.shutdown()
}
- /**
+ /*
* shutdown hbase client for flush buffers.
*/
s2graph.shutdown()
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
index 5a4f777..96f7e60 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
@@ -334,7 +334,7 @@ object CounterController extends Controller {
}
}
- /**
+ /*
* [{
* "service": , "action", "itemIds": [], "interval": string, "limit": int, "from": ts, "to": ts,
* "dimensions": [{"key": list[String]}]
@@ -720,7 +720,7 @@ object CounterController extends Controller {
def incrementCount(service: String, action: String, item: String) = Action.async(s2parse.json) { request =>
Future {
- /**
+ /*
* {
* timestamp: Long
* property: {}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/224c7370/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index 68d6dfe..478df99 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -195,7 +195,7 @@ object EdgeController extends Controller {
def deleteAllInner(jsValue: JsValue, withWait: Boolean) = {
- /** logging for delete all request */
+ /* logging for delete all request */
def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, direction: String, topicOpt: Option[String]) = {
val kafkaMessages = for {
id <- ids