You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/07/31 01:05:32 UTC

[18/25] incubator-s2graph git commit: move constants from IndexProvider to GlobalIndex.

move constants from IndexProvider to GlobalIndex.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/cc713578
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/cc713578
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/cc713578

Branch: refs/heads/master
Commit: cc71357825dc5b9cafc765b922fe3083fb6e2119
Parents: 30bf575 3725464
Author: DO YUNG YOON <st...@apache.org>
Authored: Sat Jul 29 08:04:01 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Sat Jul 29 08:04:01 2017 +0900

----------------------------------------------------------------------
 .travis/install-hbase.sh                            |  2 +-
 .../scala/org/apache/s2graph/core/Management.scala  |  2 +-
 .../apache/s2graph/core/index/IndexProvider.scala   |  9 ++-------
 .../apache/s2graph/core/mysqls/GlobalIndex.scala    | 16 ++++++++++++++--
 .../s2graph/core/tinkerpop/S2GraphProvider.scala    |  6 +-----
 5 files changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/.travis/install-hbase.sh
----------------------------------------------------------------------
diff --cc .travis/install-hbase.sh
index f55437c,f55437c..f2ba5d3
--- a/.travis/install-hbase.sh
+++ b/.travis/install-hbase.sh
@@@ -17,5 -17,5 +17,5 @@@
  set -xe
  
  if [ ! -d "$HOME/hbase-$HBASE_VERSION/bin" ]; then
--  cd $HOME && wget -q -O - http://mirror.navercorp.com/apache/hbase/stable/hbase-$HBASE_VERSION-bin.tar.gz | tar xz
++  cd $HOME && wget -q -O - http://mirror.navercorp.com/apache/hbase/stable/hbase-*-bin.tar.gz | tar xz
  fi

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
index 7f2602f,0000000..57f384c
mode 100644,000000..100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@@ -1,325 -1,0 +1,320 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.s2graph.core.index
 +
 +import java.util
 +
 +import com.typesafe.config.Config
 +import org.apache.lucene.analysis.standard.StandardAnalyzer
 +import org.apache.lucene.document._
 +import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig}
 +import org.apache.lucene.queryparser.classic.{ParseException, QueryParser}
 +import org.apache.lucene.search.IndexSearcher
 +import org.apache.lucene.store.{BaseDirectory, RAMDirectory}
 +import org.apache.s2graph.core.io.Conversions
 +import org.apache.s2graph.core.{EdgeId, S2Edge, S2Vertex}
 +import org.apache.s2graph.core.mysqls._
 +import org.apache.s2graph.core.types.{InnerValLike, VertexId}
 +import org.apache.s2graph.core.utils.logger
 +import org.apache.tinkerpop.gremlin.process.traversal.{Compare, Contains, P}
 +import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
 +import org.apache.tinkerpop.gremlin.process.traversal.util.{AndP, OrP}
 +import org.apache.tinkerpop.gremlin.structure.T
 +import play.api.libs.json.Json
 +
 +import scala.concurrent.Future
 +
 +object IndexProvider {
-   val vidField = "_vid_"
-   val eidField = "_eid_"
-   val labelField = "_label_"
-   val serviceField = "_service_"
-   val serviceColumnField = "_serviceColumn_"
- 
-   val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, serviceColumnField)
++  import GlobalIndex._
 +  val hitsPerPage = 100000
 +
 +  def apply(config: Config): IndexProvider = {
 +    val indexProviderType = "lucene"
 +//      if (config.hasPath("index.provider")) config.getString("index.provider") else "lucene"
 +
 +    indexProviderType match {
 +      case "lucene" => new LuceneIndexProvider(config)
 +    }
 +  }
 +
 +  def buildQuerySingleString(container: HasContainer): String = {
 +    import scala.collection.JavaConversions._
 +
 +    val key = if (container.getKey == T.label.getAccessor) labelField else container.getKey
 +    val value = container.getValue
 +
 +    container.getPredicate match {
 +      case and: AndP[_] =>
 +        val buffer = scala.collection.mutable.ArrayBuffer.empty[String]
 +        and.getPredicates.foreach { p =>
 +          buffer.append(buildQuerySingleString(new HasContainer(container.getKey, p)))
 +        }
 +        buffer.mkString("(", " AND ", ")")
 +      case or: OrP[_] =>
 +        val buffer = scala.collection.mutable.ArrayBuffer.empty[String]
 +        or.getPredicates.foreach { p =>
 +          buffer.append(buildQuerySingleString(new HasContainer(container.getKey, p)))
 +        }
 +        buffer.mkString("(", " OR ", ")")
 +      case _ =>
 +        val biPredicate = container.getBiPredicate
 +        biPredicate match {
 +
 +          case Contains.within =>
 +            key + ":(" + value.asInstanceOf[util.Collection[_]].toSeq.mkString(" OR ") + ")"
 +          case Contains.without =>
 +            "NOT " + key + ":(" + value.asInstanceOf[util.Collection[_]].toSeq.mkString(" AND ") + ")"
 +          case Compare.eq => s"${key}:${value}"
 +          case Compare.gt => s"(${key}:[${value} TO *] AND NOT ${key}:${value})"
 +          case Compare.gte => s"${key}:[${value} TO *]"
 +          case Compare.lt => s"${key}:[* TO ${value}]"
 +          case Compare.lte => s"(${key}:[* TO ${value}] OR ${key}:${value})"
 +          case Compare.neq => s"NOT ${key}:${value}"
 +          case _ => throw new IllegalArgumentException("not supported yet.")
 +        }
 +    }
 +  }
 +
 +  def buildQueryString(hasContainers: java.util.List[HasContainer]): String = {
 +    import scala.collection.JavaConversions._
 +    val builder = scala.collection.mutable.ArrayBuffer.empty[String]
 +
 +    hasContainers.foreach { container =>
 +      container.getPredicate match {
 +        case and: AndP[_] =>
 +          val buffer = scala.collection.mutable.ArrayBuffer.empty[String]
 +          and.getPredicates.foreach { p =>
 +            buffer.append(buildQuerySingleString(new HasContainer(container.getKey, p)))
 +          }
 +          builder.append(buffer.mkString("(", " AND ", ")"))
 +        case or: OrP[_] =>
 +          val buffer = scala.collection.mutable.ArrayBuffer.empty[String]
 +          or.getPredicates.foreach { p =>
 +            buffer.append(buildQuerySingleString(new HasContainer(container.getKey, p)))
 +          }
 +          builder.append(buffer.mkString("(", " OR ", ")"))
 +        case _ =>
 +          builder.append(buildQuerySingleString(container))
 +      }
 +    }
 +
 +    builder.mkString(" AND ")
 +  }
 +}
 +
 +trait IndexProvider {
 +  //TODO: Seq nee do be changed into stream
 +  def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId]
 +  def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[java.util.List[EdgeId]]
 +
 +  def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId]
 +  def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[java.util.List[VertexId]]
 +
 +  def mutateVertices(vertices: Seq[S2Vertex]): Seq[Boolean]
 +  def mutateVerticesAsync(vertices: Seq[S2Vertex]): Future[Seq[Boolean]]
 +
 +  def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean]
 +  def mutateEdgesAsync(edges: Seq[S2Edge]): Future[Seq[Boolean]]
 +
 +  def shutdown(): Unit
 +}
 +
 +class LuceneIndexProvider(config: Config) extends IndexProvider {
 +  import IndexProvider._
 +  import scala.collection.mutable
 +  import scala.collection.JavaConverters._
++  import GlobalIndex._
 +
 +  val analyzer = new StandardAnalyzer()
 +  val writers = mutable.Map.empty[String, IndexWriter]
 +  val directories = mutable.Map.empty[String, BaseDirectory]
 +
 +  private def getOrElseCreateIndexWriter(indexName: String): IndexWriter = {
 +    writers.getOrElseUpdate(indexName, {
 +      val dir = directories.getOrElseUpdate(indexName, new RAMDirectory())
 +      val indexConfig = new IndexWriterConfig(analyzer)
 +      new IndexWriter(dir, indexConfig)
 +    })
 +  }
 +
 +  private def toDocument(globalIndex: GlobalIndex, vertex: S2Vertex): Option[Document] = {
 +    val props = vertex.props.asScala
 +    val exist = props.exists(t => globalIndex.propNamesSet(t._1))
 +    if (!exist) None
 +    else {
 +      val doc = new Document()
 +      val id = vertex.id.toString
 +
 +      doc.add(new StringField(vidField, id, Field.Store.YES))
 +      doc.add(new StringField(serviceField, vertex.serviceName, Field.Store.YES))
 +      doc.add(new StringField(serviceColumnField, vertex.columnName, Field.Store.YES))
 +
 +      props.foreach { case (dim, s2VertexProperty) =>
 +        val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO
 +        val field = s2VertexProperty.columnMeta.dataType match {
 +          case "string" => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex)
 +          case _ => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex)
 +        }
 +        doc.add(field)
 +      }
 +
 +      Option(doc)
 +    }
 +  }
 +
 +  private def toDocument(globalIndex: GlobalIndex, edge: S2Edge): Option[Document] = {
 +    val props = edge.propsWithTs.asScala
 +    val exist = props.exists(t => globalIndex.propNamesSet(t._1))
 +    if (!exist) None
 +    else {
 +      val doc = new Document()
 +      val id = edge.edgeId.toString
 +
 +      doc.add(new StringField(eidField, id, Field.Store.YES))
 +      doc.add(new StringField(serviceField, edge.serviceName, Field.Store.YES))
 +      doc.add(new StringField(labelField, edge.label(), Field.Store.YES))
 +
 +      props.foreach { case (dim, s2Property) =>
 +        val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO
 +        val field = s2Property.labelMeta.dataType match {
 +          case "string" => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex)
 +          case _ => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex)
 +        }
 +        doc.add(field)
 +      }
 +
 +      Option(doc)
 +    }
 +  }
 +
 +  override def mutateVertices(vertices: Seq[S2Vertex]): Seq[Boolean] = {
 +    val globalIndexOptions = GlobalIndex.findAll()
 +
 +    globalIndexOptions.map { globalIndex =>
 +      val writer = getOrElseCreateIndexWriter(globalIndex.indexName)
 +
 +      vertices.foreach { vertex =>
 +        toDocument(globalIndex, vertex).foreach { doc =>
 +          writer.addDocument(doc)
 +        }
 +      }
 +
 +      writer.commit()
 +    }
 +
 +    vertices.map(_ => true)
 +  }
 +
 +  override def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean] = {
 +    val globalIndexOptions = GlobalIndex.findAll()
 +
 +    globalIndexOptions.map { globalIndex =>
 +      val writer = getOrElseCreateIndexWriter(globalIndex.indexName)
 +
 +      edges.foreach { edge =>
 +        toDocument(globalIndex, edge).foreach { doc =>
 +          writer.addDocument(doc)
 +        }
 +      }
 +
 +      writer.commit()
 +    }
 +
 +    edges.map(_ => true)
 +  }
 +
 +  override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] = {
 +    val field = eidField
 +    val ids = new java.util.HashSet[EdgeId]
 +
 +    GlobalIndex.findGlobalIndex(hasContainers).map { globalIndex =>
 +      val queryString = buildQueryString(hasContainers)
 +
 +      try {
 +        val q = new QueryParser(field, analyzer).parse(queryString)
 +
 +        val reader = DirectoryReader.open(directories(globalIndex.indexName))
 +        val searcher = new IndexSearcher(reader)
 +
 +        val docs = searcher.search(q, hitsPerPage)
 +
 +        docs.scoreDocs.foreach { scoreDoc =>
 +          val document = searcher.doc(scoreDoc.doc)
 +          val id = Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get
 +          ids.add(id);
 +        }
 +
 +        reader.close()
 +        ids
 +      } catch {
 +        case ex: ParseException =>
 +          logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
 +          ids
 +      }
 +    }
 +
 +    new util.ArrayList[EdgeId](ids)
 +  }
 +
 +  override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] = {
 +    val field = vidField
 +    val ids = new java.util.HashSet[VertexId]
 +
 +    GlobalIndex.findGlobalIndex(hasContainers).map { globalIndex =>
 +      val queryString = buildQueryString(hasContainers)
 +
 +      try {
 +        val q = new QueryParser(field, analyzer).parse(queryString)
 +
 +        val reader = DirectoryReader.open(directories(globalIndex.indexName))
 +        val searcher = new IndexSearcher(reader)
 +
 +        val docs = searcher.search(q, hitsPerPage)
 +
 +        docs.scoreDocs.foreach { scoreDoc =>
 +          val document = searcher.doc(scoreDoc.doc)
 +          val id = Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get
 +          ids.add(id)
 +        }
 +
 +        reader.close()
 +        ids
 +      } catch {
 +        case ex: ParseException =>
 +          logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
 +          ids
 +      }
 +    }
 +
 +    new util.ArrayList[VertexId](ids)
 +  }
 +
 +  override def shutdown(): Unit = {
 +    writers.foreach { case (_, writer) => writer.close() }
 +  }
 +
 +  override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers))
 +
 +  override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers))
 +
 +  override def mutateVerticesAsync(vertices: Seq[S2Vertex]): Future[Seq[Boolean]] = Future.successful(mutateVertices(vertices))
 +
 +  override def mutateEdgesAsync(edges: Seq[S2Edge]): Future[Seq[Boolean]] = Future.successful(mutateEdges(edges))
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
index bb18949,1d1dfe2..347c083
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
@@@ -6,9 -6,9 +6,15 @@@
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
++<<<<<<< HEAD
 + * 
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + * 
++=======
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
++>>>>>>> S2GRAPH-152
   * Unless required by applicable law or agreed to in writing,
   * software distributed under the License is distributed on an
   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
----------------------------------------------------------------------
diff --cc s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
index 3157e18,87eac0e..d8367d7
--- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
@@@ -19,9 -19,9 +19,6 @@@
  
  package org.apache.s2graph.core.tinkerpop
  
--import java.util
--
--import com.typesafe.config.ConfigFactory
  import org.apache.commons.configuration.Configuration
  import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
  import org.apache.s2graph.core.Management.JsonModel.Prop
@@@ -34,7 -33,7 +30,7 @@@ import org.apache.s2graph.core.utils.lo
  import org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData
  import org.apache.tinkerpop.gremlin.structure.{Element, Graph, T}
  import org.apache.tinkerpop.gremlin.{AbstractGraphProvider, LoadGraphWith}
--
++import java.util
  import scala.collection.JavaConverters._
  
  object S2GraphProvider {