You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/11 03:05:42 UTC
[02/11] incubator-s2graph git commit: separate Storage into multiple
small interfaces such as EdgeFetcher/VertexMutator, ...
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
deleted file mode 100644
index d29ccce..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.rocks
-
-import java.util.concurrent.locks.ReentrantLock
-
-import com.google.common.cache.{Cache, LoadingCache}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.S2GraphLike
-import org.apache.s2graph.core.storage._
-import org.apache.s2graph.core.utils.logger
-import org.rocksdb.{RocksDB, RocksDBException, WriteBatch, WriteOptions}
-
-import scala.concurrent.{ExecutionContext, Future}
-
-class RocksStorageWritable(val graph: S2GraphLike,
- val serDe: StorageSerDe,
- val reader: StorageReadable,
- val db: RocksDB,
- val vdb: RocksDB,
- val lockMap: LoadingCache[String, ReentrantLock]) extends DefaultOptimisticMutator(graph, serDe, reader) {
-
- override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = {
- if (kvs.isEmpty) {
- Future.successful(MutateResponse.Success)
- } else {
- val ret = {
- val (kvsV, kvsE) = kvs.partition(kv => Bytes.equals(kv.cf, SKeyValue.VertexCf))
- val writeBatchV = buildWriteBatch(kvsV)
- val writeBatchE = buildWriteBatch(kvsE)
- val writeOptions = new WriteOptions
- try {
- vdb.write(writeOptions, writeBatchV)
- db.write(writeOptions, writeBatchE)
- true
- } catch {
- case e: Exception =>
- logger.error(s"writeAsyncSimple failed.", e)
- false
- } finally {
- writeBatchV.close()
- writeBatchE.close()
- writeOptions.close()
- }
- }
-
- Future.successful(new MutateResponse(ret))
- }
- }
-
-
- override def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext) = {
- def op = {
- val writeOptions = new WriteOptions
- try {
- val fetchedValue = db.get(requestKeyValue.row)
- val innerRet = expectedOpt match {
- case None =>
- if (fetchedValue == null) {
-
- db.put(writeOptions, requestKeyValue.row, requestKeyValue.value)
- true
- } else {
- false
- }
- case Some(kv) =>
- if (fetchedValue == null) {
- false
- } else {
- if (Bytes.compareTo(fetchedValue, kv.value) == 0) {
- db.put(writeOptions, requestKeyValue.row, requestKeyValue.value)
- true
- } else {
- false
- }
- }
- }
-
- Future.successful(new MutateResponse(innerRet))
- } catch {
- case e: RocksDBException =>
- logger.error(s"Write lock failed", e)
- Future.successful(MutateResponse.Failure)
- } finally {
- writeOptions.close()
- }
- }
-
- withLock(requestKeyValue.row)(op)
- }
-
- private def buildWriteBatch(kvs: Seq[SKeyValue]): WriteBatch = {
- val writeBatch = new WriteBatch()
- kvs.foreach { kv =>
- kv.operation match {
- case SKeyValue.Put => writeBatch.put(kv.row, kv.value)
- case SKeyValue.Delete => writeBatch.remove(kv.row)
- case SKeyValue.Increment => writeBatch.merge(kv.row, kv.value)
- case _ => throw new RuntimeException(s"not supported rpc operation. ${kv.operation}")
- }
- }
- writeBatch
- }
-
- private def withLock[A](key: Array[Byte])(op: => A): A = {
- val lockKey = Bytes.toString(key)
- val lock = lockMap.get(lockKey)
-
- try {
- lock.lock
- op
- } finally {
- lock.unlock()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala
new file mode 100644
index 0000000..20acfaa
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.rocks
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.schema.ServiceColumn
+import org.apache.s2graph.core.{S2GraphLike, S2VertexLike, VertexBulkFetcher}
+import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.HBaseType
+import org.rocksdb.RocksDB
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksVertexBulkFetcher(val graph: S2GraphLike,
+ val config: Config,
+ val db: RocksDB,
+ val vdb: RocksDB,
+ val serDe: StorageSerDe,
+ val io: StorageIO) extends VertexBulkFetcher {
+ import RocksStorage._
+
+ override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
+ import scala.collection.mutable
+
+ val vertices = new ArrayBuffer[S2VertexLike]()
+ ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case (hTableName, columns) =>
+ val distinctColumns = columns.toSet
+
+ val iter = vdb.newIterator()
+ val buffer = mutable.ListBuffer.empty[SKeyValue]
+ var oldVertexIdBytes = Array.empty[Byte]
+ var minusPos = 0
+
+ try {
+ iter.seekToFirst()
+ while (iter.isValid) {
+ val row = iter.key()
+ if (!Bytes.equals(oldVertexIdBytes, 0, oldVertexIdBytes.length - minusPos, row, 0, row.length - 1)) {
+ if (buffer.nonEmpty)
+ serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
+ .filter(v => distinctColumns(v.serviceColumn))
+ .foreach { vertex =>
+ vertices += vertex
+ }
+
+ oldVertexIdBytes = row
+ minusPos = 1
+ buffer.clear()
+ }
+ val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, iter.value(), System.currentTimeMillis())
+ buffer += kv
+
+ iter.next()
+ }
+ if (buffer.nonEmpty)
+ serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
+ .filter(v => distinctColumns(v.serviceColumn))
+ .foreach { vertex =>
+ vertices += vertex
+ }
+
+ } finally {
+ iter.close()
+ }
+ }
+
+ Future.successful(vertices)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
new file mode 100644
index 0000000..6becd98
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.rocks
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
+import org.rocksdb.RocksDB
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksVertexFetcher(val graph: S2GraphLike,
+ val config: Config,
+ val db: RocksDB,
+ val vdb: RocksDB,
+ val serDe: StorageSerDe,
+ val io: StorageIO) extends VertexFetcher {
+ private def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
+ val rpc = RocksStorage.buildRequest(queryRequest, vertex)
+
+ RocksStorage.fetchKeyValues(vdb, db, rpc)
+ }
+
+ override def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
+ def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
+ if (kvs.isEmpty) Nil
+ else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
+ }
+
+ val futures = vertices.map { vertex =>
+ val queryParam = QueryParam.Empty
+ val q = Query.toQuery(Seq(vertex), Seq(queryParam))
+ val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
+
+ fetchKeyValues(queryRequest, vertex).map { kvs =>
+ fromResult(kvs, vertex.serviceColumn.schemaVersion)
+ } recoverWith {
+ case ex: Throwable => Future.successful(Nil)
+ }
+ }
+
+ Future.sequence(futures).map(_.flatten)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
deleted file mode 100644
index 8cd32d4..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.s2graph.core.storage
-import org.apache.s2graph.core._
-import org.apache.s2graph.core.schema.LabelMeta
-import org.apache.s2graph.core.utils.logger
-
-import scala.concurrent.{ExecutionContext, Future}
-
-abstract class DefaultOptimisticMutator(graph: S2GraphLike,
- serDe: StorageSerDe,
- reader: StorageReadable) extends OptimisticMutator {
- val fetcher = reader
-
- lazy val io: StorageIO = new StorageIO(graph, serDe)
- lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, this, reader)
-
-// private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
-// mutator.writeToStorage(cluster, kvs, withWait)
-
- def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
- requestTs: Long,
- retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
- if (stepInnerResult.isEmpty) Future.successful(true)
- else {
- val head = stepInnerResult.edgeWithScores.head
- val zkQuorum = head.edge.innerLabel.hbaseZkAddr
- val futures = for {
- edgeWithScore <- stepInnerResult.edgeWithScores
- } yield {
- val edge = edgeWithScore.edge
-
- val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
- val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
-
- val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
- val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
- serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- io.buildIncrementsAsync(indexEdge, -1L)
- }
-
- /* reverted direction */
- val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
- val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
- serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- io.buildIncrementsAsync(indexEdge, -1L)
- }
-
- val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
-
- writeToStorage(zkQuorum, mutations, withWait = true)
- }
-
- Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
- }
- }
-
- def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
- if (vertex.op == GraphUtil.operations("delete")) {
- writeToStorage(zkQuorum,
- serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
- } else if (vertex.op == GraphUtil.operations("deleteAll")) {
- logger.info(s"deleteAll for vertex is truncated. $vertex")
- Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
- } else {
- writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
- }
- }
-
- def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
- val mutations = _edges.flatMap { edge =>
- val (_, edgeUpdate) =
- if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
- else S2Edge.buildOperation(None, Seq(edge))
-
- val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
-
- if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
- io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
- }
-
- writeToStorage(zkQuorum, mutations, withWait).map { ret =>
- _edges.zipWithIndex.map { case (edge, idx) =>
- idx -> ret.isSuccess
- }
- }
- }
-
- def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
- def mutateEdgesInner(edges: Seq[S2EdgeLike],
- checkConsistency: Boolean,
- withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
- assert(edges.nonEmpty)
- // TODO:: remove after code review: unreachable code
- if (!checkConsistency) {
-
- val futures = edges.map { edge =>
- val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
-
- val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
- val mutations =
- io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
-
- if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
-
- writeToStorage(zkQuorum, mutations, withWait)
- }
- Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
- } else {
- fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
- conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
- }
- }
- }
-
- val edgeWithIdxs = _edges.zipWithIndex
- val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
- (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
- } toSeq
-
- val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
- val edges = edgeGroup.map(_._1)
- val idxs = edgeGroup.map(_._2)
- // After deleteAll, process others
- val mutateEdgeFutures = edges.toList match {
- case head :: tail =>
- val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
-
- //TODO: decide what we will do on failure on vertex put
- val puts = io.buildVertexPutsAsync(head)
- val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
- Seq(edgeFuture, vertexFuture)
- case Nil => Nil
- }
-
- val composed = for {
- // deleteRet <- Future.sequence(deleteAllFutures)
- mutateRet <- Future.sequence(mutateEdgeFutures)
- } yield mutateRet
-
- composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
- }
-
- Future.sequence(mutateEdges).map { squashedRets =>
- squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
- }
- }
-
- def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
- val futures = for {
- edge <- edges
- } yield {
- val kvs = for {
- relEdge <- edge.relatedEdges
- edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
- } yield {
- val countWithTs = edge.propertyValueInner(LabelMeta.count)
- val countVal = countWithTs.innerVal.toString().toLong
- io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
- }
- writeToStorage(zkQuorum, kvs, withWait = withWait)
- }
-
- Future.sequence(futures)
- }
-
- def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
- val kvs = io.buildDegreePuts(edge, degreeVal)
-
- writeToStorage(zkQuorum, kvs, withWait = true)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/utils/ImportStatus.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/ImportStatus.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/ImportStatus.scala
new file mode 100644
index 0000000..aa0c6b5
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/ImportStatus.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.utils
+
+import java.util.concurrent.atomic.AtomicInteger
+
+trait ImportStatus {
+ val done: AtomicInteger
+
+ def isCompleted: Boolean
+
+ def percentage: Int
+
+ val total: Int
+}
+
+class ImportRunningStatus(val total: Int) extends ImportStatus {
+ require(total > 0, s"Total should be positive: $total")
+
+ val done = new AtomicInteger(0)
+
+ def isCompleted: Boolean = total == done.get
+
+ def percentage = 100 * done.get / total
+}
+
+case object ImportDoneStatus extends ImportStatus {
+ val total = 1
+
+ val done = new AtomicInteger(1)
+
+ def isCompleted: Boolean = true
+
+ def percentage = 100
+}
+
+object ImportStatus {
+ def apply(total: Int): ImportStatus = new ImportRunningStatus(total)
+
+ def unapply(importResult: ImportStatus): Option[(Boolean, Int, Int)] =
+ Some((importResult.isCompleted, importResult.total, importResult.done.get))
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala
new file mode 100644
index 0000000..300106a
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.utils
+
+import java.io.File
+
+import com.typesafe.config.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.s2graph.core.{EdgeFetcher, S2GraphLike}
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object Importer {
+ def toHDFSConfiguration(hdfsConfDir: String): Configuration = {
+ val conf = new Configuration
+
+ val hdfsConfDirectory = new File(hdfsConfDir)
+ if (hdfsConfDirectory.exists()) {
+ if (!hdfsConfDirectory.isDirectory || !hdfsConfDirectory.canRead) {
+ throw new IllegalStateException(s"HDFS configuration directory ($hdfsConfDirectory) cannot be read.")
+ }
+
+ val path = hdfsConfDirectory.getAbsolutePath
+ conf.addResource(new Path(s"file:///$path/core-site.xml"))
+ conf.addResource(new Path(s"file:///$path/hdfs-site.xml"))
+ } else {
+ logger.warn("RocksDBImporter doesn't have valid hadoop configuration directory..")
+ }
+ conf
+ }
+}
+
+trait Importer {
+ @volatile var isFinished: Boolean = false
+
+ def run(config: Config)(implicit ec: ExecutionContext): Future[Importer]
+
+ def status: Boolean = isFinished
+
+ def setStatus(otherStatus: Boolean): Boolean = {
+ this.isFinished = otherStatus
+ this.isFinished
+ }
+
+ def close(): Unit
+}
+
+case class IdentityImporter(graph: S2GraphLike) extends Importer {
+ override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
+ Future.successful(this)
+ }
+
+ override def close(): Unit = {}
+}
+
+object HDFSImporter {
+
+ import scala.collection.JavaConverters._
+
+ val PathsKey = "paths"
+ val HDFSConfDirKey = "hdfsConfDir"
+
+ def extractPaths(config: Config): Map[String, String] = {
+ config.getConfigList(PathsKey).asScala.map { e =>
+ val key = e.getString("src")
+ val value = e.getString("tgt")
+
+ key -> value
+ }.toMap
+ }
+}
+
+case class HDFSImporter(graph: S2GraphLike) extends Importer {
+
+ import HDFSImporter._
+
+ override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
+ Future {
+ val paths = extractPaths(config)
+ val hdfsConfiDir = config.getString(HDFSConfDirKey)
+
+ val hadoopConfig = Importer.toHDFSConfiguration(hdfsConfiDir)
+ val fs = FileSystem.get(hadoopConfig)
+
+ def copyToLocal(remoteSrc: String, localSrc: String): Unit = {
+ val remoteSrcPath = new Path(remoteSrc)
+ val localSrcPath = new Path(localSrc)
+
+ fs.copyToLocalFile(remoteSrcPath, localSrcPath)
+ }
+
+ paths.foreach { case (srcPath, tgtPath) =>
+ copyToLocal(srcPath, tgtPath)
+ }
+
+ this
+ }
+ }
+
+ // override def status: ImportStatus = ???
+
+ override def close(): Unit = {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala
new file mode 100644
index 0000000..6d95c93
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.fetcher
+
+import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.core.Integrate.IntegrateCommon
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.{Query, QueryParam}
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext}
+
+class EdgeFetcherTest extends IntegrateCommon {
+
+ import scala.collection.JavaConverters._
+
+ test("MemoryModelFetcher") {
+ // 1. create label.
+ // 2. importLabel.
+ // 3. fetch.
+ val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
+ val serviceColumn =
+ management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
+ val labelName = "fetcher_test"
+ val options =
+ s"""{
+ |
+ | "importer": {
+ | "${FetcherManager.ClassNameKey}": "org.apache.s2graph.core.utils.IdentityImporter"
+ | },
+ | "fetcher": {
+ | "${FetcherManager.ClassNameKey}": "org.apache.s2graph.core.fetcher.MemoryModelFetcher"
+ | }
+ |}""".stripMargin
+
+ Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) }
+
+ val label = management.createLabel(
+ labelName,
+ serviceColumn,
+ serviceColumn,
+ true,
+ service.serviceName,
+ Seq.empty[Index].asJava,
+ Seq.empty[Prop].asJava,
+ "strong",
+ null,
+ -1,
+ "v3",
+ "gz",
+ options
+ )
+ val config = ConfigFactory.parseString(options)
+ val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global)
+ Await.ready(importerFuture, Duration("60 seconds"))
+
+ Thread.sleep(1000)
+
+ val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "daewon")
+ val queryParam = QueryParam(labelName = labelName)
+
+ val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam))
+ val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds"))
+
+ stepResult.edgeWithScores.foreach { es =>
+ println(es.edge)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala
deleted file mode 100644
index 6c76cdf..0000000
--- a/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.model
-
-import com.typesafe.config.ConfigFactory
-import org.apache.s2graph.core.Integrate.IntegrateCommon
-import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.schema.Label
-import org.apache.s2graph.core.{Query, QueryParam}
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext}
-
-class FetcherTest extends IntegrateCommon {
-
- import scala.collection.JavaConverters._
-
- test("MemoryModelFetcher") {
- // 1. create label.
- // 2. importLabel.
- // 3. fetch.
- val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
- val serviceColumn =
- management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
- val labelName = "fetcher_test"
- val options =
- s"""{
- |
- | "importer": {
- | "${ModelManager.ClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter"
- | },
- | "fetcher": {
- | "${ModelManager.ClassNameKey}": "org.apache.s2graph.core.model.MemoryModelFetcher"
- | }
- |}""".stripMargin
-
- Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) }
-
- val label = management.createLabel(
- labelName,
- serviceColumn,
- serviceColumn,
- true,
- service.serviceName,
- Seq.empty[Index].asJava,
- Seq.empty[Prop].asJava,
- "strong",
- null,
- -1,
- "v3",
- "gz",
- options
- )
- val config = ConfigFactory.parseString(options)
- val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global)
- Await.ready(importerFuture, Duration("60 seconds"))
-
- Thread.sleep(1000)
-
- val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "daewon")
- val queryParam = QueryParam(labelName = labelName)
-
- val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam))
- val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds"))
-
- stepResult.edgeWithScores.foreach { es =>
- println(es.edge)
- }
- }
-}