You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/07/31 01:05:32 UTC
[18/25] incubator-s2graph git commit: move constants from
IndexProvider to GlobalIndex.
move constants from IndexProvider to GlobalIndex.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/cc713578
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/cc713578
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/cc713578
Branch: refs/heads/master
Commit: cc71357825dc5b9cafc765b922fe3083fb6e2119
Parents: 30bf575 3725464
Author: DO YUNG YOON <st...@apache.org>
Authored: Sat Jul 29 08:04:01 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Sat Jul 29 08:04:01 2017 +0900
----------------------------------------------------------------------
.travis/install-hbase.sh | 2 +-
.../scala/org/apache/s2graph/core/Management.scala | 2 +-
.../apache/s2graph/core/index/IndexProvider.scala | 9 ++-------
.../apache/s2graph/core/mysqls/GlobalIndex.scala | 16 ++++++++++++++--
.../s2graph/core/tinkerpop/S2GraphProvider.scala | 6 +-----
5 files changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/.travis/install-hbase.sh
----------------------------------------------------------------------
diff --cc .travis/install-hbase.sh
index f55437c,f55437c..f2ba5d3
--- a/.travis/install-hbase.sh
+++ b/.travis/install-hbase.sh
@@@ -17,5 -17,5 +17,5 @@@
set -xe
if [ ! -d "$HOME/hbase-$HBASE_VERSION/bin" ]; then
-- cd $HOME && wget -q -O - http://mirror.navercorp.com/apache/hbase/stable/hbase-$HBASE_VERSION-bin.tar.gz | tar xz
++ cd $HOME && wget -q -O - http://mirror.navercorp.com/apache/hbase/stable/hbase-*-bin.tar.gz | tar xz
fi
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
index 7f2602f,0000000..57f384c
mode 100644,000000..100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@@ -1,325 -1,0 +1,320 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.index
+
+import java.util
+
+import com.typesafe.config.Config
+import org.apache.lucene.analysis.standard.StandardAnalyzer
+import org.apache.lucene.document._
+import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig}
+import org.apache.lucene.queryparser.classic.{ParseException, QueryParser}
+import org.apache.lucene.search.IndexSearcher
+import org.apache.lucene.store.{BaseDirectory, RAMDirectory}
+import org.apache.s2graph.core.io.Conversions
+import org.apache.s2graph.core.{EdgeId, S2Edge, S2Vertex}
+import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.types.{InnerValLike, VertexId}
+import org.apache.s2graph.core.utils.logger
+import org.apache.tinkerpop.gremlin.process.traversal.{Compare, Contains, P}
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
+import org.apache.tinkerpop.gremlin.process.traversal.util.{AndP, OrP}
+import org.apache.tinkerpop.gremlin.structure.T
+import play.api.libs.json.Json
+
+import scala.concurrent.Future
+
+object IndexProvider {
- val vidField = "_vid_"
- val eidField = "_eid_"
- val labelField = "_label_"
- val serviceField = "_service_"
- val serviceColumnField = "_serviceColumn_"
-
- val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, serviceColumnField)
++ import GlobalIndex._
+ val hitsPerPage = 100000
+
+ def apply(config: Config): IndexProvider = {
+ val indexProviderType = "lucene"
+// if (config.hasPath("index.provider")) config.getString("index.provider") else "lucene"
+
+ indexProviderType match {
+ case "lucene" => new LuceneIndexProvider(config)
+ }
+ }
+
+ def buildQuerySingleString(container: HasContainer): String = {
+ import scala.collection.JavaConversions._
+
+ val key = if (container.getKey == T.label.getAccessor) labelField else container.getKey
+ val value = container.getValue
+
+ container.getPredicate match {
+ case and: AndP[_] =>
+ val buffer = scala.collection.mutable.ArrayBuffer.empty[String]
+ and.getPredicates.foreach { p =>
+ buffer.append(buildQuerySingleString(new HasContainer(container.getKey, p)))
+ }
+ buffer.mkString("(", " AND ", ")")
+ case or: OrP[_] =>
+ val buffer = scala.collection.mutable.ArrayBuffer.empty[String]
+ or.getPredicates.foreach { p =>
+ buffer.append(buildQuerySingleString(new HasContainer(container.getKey, p)))
+ }
+ buffer.mkString("(", " OR ", ")")
+ case _ =>
+ val biPredicate = container.getBiPredicate
+ biPredicate match {
+
+ case Contains.within =>
+ key + ":(" + value.asInstanceOf[util.Collection[_]].toSeq.mkString(" OR ") + ")"
+ case Contains.without =>
+ "NOT " + key + ":(" + value.asInstanceOf[util.Collection[_]].toSeq.mkString(" AND ") + ")"
+ case Compare.eq => s"${key}:${value}"
+ case Compare.gt => s"(${key}:[${value} TO *] AND NOT ${key}:${value})"
+ case Compare.gte => s"${key}:[${value} TO *]"
+ case Compare.lt => s"${key}:[* TO ${value}]"
+ case Compare.lte => s"(${key}:[* TO ${value}] OR ${key}:${value})"
+ case Compare.neq => s"NOT ${key}:${value}"
+ case _ => throw new IllegalArgumentException("not supported yet.")
+ }
+ }
+ }
+
+ def buildQueryString(hasContainers: java.util.List[HasContainer]): String = {
+ import scala.collection.JavaConversions._
+ val builder = scala.collection.mutable.ArrayBuffer.empty[String]
+
+ hasContainers.foreach { container =>
+ container.getPredicate match {
+ case and: AndP[_] =>
+ val buffer = scala.collection.mutable.ArrayBuffer.empty[String]
+ and.getPredicates.foreach { p =>
+ buffer.append(buildQuerySingleString(new HasContainer(container.getKey, p)))
+ }
+ builder.append(buffer.mkString("(", " AND ", ")"))
+ case or: OrP[_] =>
+ val buffer = scala.collection.mutable.ArrayBuffer.empty[String]
+ or.getPredicates.foreach { p =>
+ buffer.append(buildQuerySingleString(new HasContainer(container.getKey, p)))
+ }
+ builder.append(buffer.mkString("(", " OR ", ")"))
+ case _ =>
+ builder.append(buildQuerySingleString(container))
+ }
+ }
+
+ builder.mkString(" AND ")
+ }
+}
+
+trait IndexProvider {
+ //TODO: Seq nee do be changed into stream
+ def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId]
+ def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[java.util.List[EdgeId]]
+
+ def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId]
+ def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[java.util.List[VertexId]]
+
+ def mutateVertices(vertices: Seq[S2Vertex]): Seq[Boolean]
+ def mutateVerticesAsync(vertices: Seq[S2Vertex]): Future[Seq[Boolean]]
+
+ def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean]
+ def mutateEdgesAsync(edges: Seq[S2Edge]): Future[Seq[Boolean]]
+
+ def shutdown(): Unit
+}
+
+class LuceneIndexProvider(config: Config) extends IndexProvider {
+ import IndexProvider._
+ import scala.collection.mutable
+ import scala.collection.JavaConverters._
++ import GlobalIndex._
+
+ val analyzer = new StandardAnalyzer()
+ val writers = mutable.Map.empty[String, IndexWriter]
+ val directories = mutable.Map.empty[String, BaseDirectory]
+
+ private def getOrElseCreateIndexWriter(indexName: String): IndexWriter = {
+ writers.getOrElseUpdate(indexName, {
+ val dir = directories.getOrElseUpdate(indexName, new RAMDirectory())
+ val indexConfig = new IndexWriterConfig(analyzer)
+ new IndexWriter(dir, indexConfig)
+ })
+ }
+
+ private def toDocument(globalIndex: GlobalIndex, vertex: S2Vertex): Option[Document] = {
+ val props = vertex.props.asScala
+ val exist = props.exists(t => globalIndex.propNamesSet(t._1))
+ if (!exist) None
+ else {
+ val doc = new Document()
+ val id = vertex.id.toString
+
+ doc.add(new StringField(vidField, id, Field.Store.YES))
+ doc.add(new StringField(serviceField, vertex.serviceName, Field.Store.YES))
+ doc.add(new StringField(serviceColumnField, vertex.columnName, Field.Store.YES))
+
+ props.foreach { case (dim, s2VertexProperty) =>
+ val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO
+ val field = s2VertexProperty.columnMeta.dataType match {
+ case "string" => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex)
+ case _ => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex)
+ }
+ doc.add(field)
+ }
+
+ Option(doc)
+ }
+ }
+
+ private def toDocument(globalIndex: GlobalIndex, edge: S2Edge): Option[Document] = {
+ val props = edge.propsWithTs.asScala
+ val exist = props.exists(t => globalIndex.propNamesSet(t._1))
+ if (!exist) None
+ else {
+ val doc = new Document()
+ val id = edge.edgeId.toString
+
+ doc.add(new StringField(eidField, id, Field.Store.YES))
+ doc.add(new StringField(serviceField, edge.serviceName, Field.Store.YES))
+ doc.add(new StringField(labelField, edge.label(), Field.Store.YES))
+
+ props.foreach { case (dim, s2Property) =>
+ val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO
+ val field = s2Property.labelMeta.dataType match {
+ case "string" => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex)
+ case _ => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex)
+ }
+ doc.add(field)
+ }
+
+ Option(doc)
+ }
+ }
+
+ override def mutateVertices(vertices: Seq[S2Vertex]): Seq[Boolean] = {
+ val globalIndexOptions = GlobalIndex.findAll()
+
+ globalIndexOptions.map { globalIndex =>
+ val writer = getOrElseCreateIndexWriter(globalIndex.indexName)
+
+ vertices.foreach { vertex =>
+ toDocument(globalIndex, vertex).foreach { doc =>
+ writer.addDocument(doc)
+ }
+ }
+
+ writer.commit()
+ }
+
+ vertices.map(_ => true)
+ }
+
+ override def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean] = {
+ val globalIndexOptions = GlobalIndex.findAll()
+
+ globalIndexOptions.map { globalIndex =>
+ val writer = getOrElseCreateIndexWriter(globalIndex.indexName)
+
+ edges.foreach { edge =>
+ toDocument(globalIndex, edge).foreach { doc =>
+ writer.addDocument(doc)
+ }
+ }
+
+ writer.commit()
+ }
+
+ edges.map(_ => true)
+ }
+
+ override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] = {
+ val field = eidField
+ val ids = new java.util.HashSet[EdgeId]
+
+ GlobalIndex.findGlobalIndex(hasContainers).map { globalIndex =>
+ val queryString = buildQueryString(hasContainers)
+
+ try {
+ val q = new QueryParser(field, analyzer).parse(queryString)
+
+ val reader = DirectoryReader.open(directories(globalIndex.indexName))
+ val searcher = new IndexSearcher(reader)
+
+ val docs = searcher.search(q, hitsPerPage)
+
+ docs.scoreDocs.foreach { scoreDoc =>
+ val document = searcher.doc(scoreDoc.doc)
+ val id = Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get
+ ids.add(id);
+ }
+
+ reader.close()
+ ids
+ } catch {
+ case ex: ParseException =>
+ logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
+ ids
+ }
+ }
+
+ new util.ArrayList[EdgeId](ids)
+ }
+
+ override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] = {
+ val field = vidField
+ val ids = new java.util.HashSet[VertexId]
+
+ GlobalIndex.findGlobalIndex(hasContainers).map { globalIndex =>
+ val queryString = buildQueryString(hasContainers)
+
+ try {
+ val q = new QueryParser(field, analyzer).parse(queryString)
+
+ val reader = DirectoryReader.open(directories(globalIndex.indexName))
+ val searcher = new IndexSearcher(reader)
+
+ val docs = searcher.search(q, hitsPerPage)
+
+ docs.scoreDocs.foreach { scoreDoc =>
+ val document = searcher.doc(scoreDoc.doc)
+ val id = Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get
+ ids.add(id)
+ }
+
+ reader.close()
+ ids
+ } catch {
+ case ex: ParseException =>
+ logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
+ ids
+ }
+ }
+
+ new util.ArrayList[VertexId](ids)
+ }
+
+ override def shutdown(): Unit = {
+ writers.foreach { case (_, writer) => writer.close() }
+ }
+
+ override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers))
+
+ override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers))
+
+ override def mutateVerticesAsync(vertices: Seq[S2Vertex]): Future[Seq[Boolean]] = Future.successful(mutateVertices(vertices))
+
+ override def mutateEdgesAsync(edges: Seq[S2Edge]): Future[Seq[Boolean]] = Future.successful(mutateEdges(edges))
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
index bb18949,1d1dfe2..347c083
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
@@@ -6,9 -6,9 +6,15 @@@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
++<<<<<<< HEAD
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
++=======
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
++>>>>>>> S2GRAPH-152
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
----------------------------------------------------------------------
diff --cc s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
index 3157e18,87eac0e..d8367d7
--- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
@@@ -19,9 -19,9 +19,6 @@@
package org.apache.s2graph.core.tinkerpop
--import java.util
--
--import com.typesafe.config.ConfigFactory
import org.apache.commons.configuration.Configuration
import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.Management.JsonModel.Prop
@@@ -34,7 -33,7 +30,7 @@@ import org.apache.s2graph.core.utils.lo
import org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData
import org.apache.tinkerpop.gremlin.structure.{Element, Graph, T}
import org.apache.tinkerpop.gremlin.{AbstractGraphProvider, LoadGraphWith}
--
++import java.util
import scala.collection.JavaConverters._
object S2GraphProvider {