You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/11 03:05:42 UTC

[02/11] incubator-s2graph git commit: separate Storage into multiple small interfaces such as EdgeFetcher/VertexMutator, ...

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
deleted file mode 100644
index d29ccce..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.rocks
-
-import java.util.concurrent.locks.ReentrantLock
-
-import com.google.common.cache.{Cache, LoadingCache}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.S2GraphLike
-import org.apache.s2graph.core.storage._
-import org.apache.s2graph.core.utils.logger
-import org.rocksdb.{RocksDB, RocksDBException, WriteBatch, WriteOptions}
-
-import scala.concurrent.{ExecutionContext, Future}
-
-class RocksStorageWritable(val graph: S2GraphLike,
-                           val serDe: StorageSerDe,
-                           val reader: StorageReadable,
-                           val db: RocksDB,
-                           val vdb: RocksDB,
-                           val lockMap: LoadingCache[String, ReentrantLock]) extends DefaultOptimisticMutator(graph, serDe, reader) {
-
-  override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = {
-    if (kvs.isEmpty) {
-      Future.successful(MutateResponse.Success)
-    } else {
-      val ret = {
-        val (kvsV, kvsE) = kvs.partition(kv => Bytes.equals(kv.cf, SKeyValue.VertexCf))
-        val writeBatchV = buildWriteBatch(kvsV)
-        val writeBatchE = buildWriteBatch(kvsE)
-        val writeOptions = new WriteOptions
-        try {
-          vdb.write(writeOptions, writeBatchV)
-          db.write(writeOptions, writeBatchE)
-          true
-        } catch {
-          case e: Exception =>
-            logger.error(s"writeAsyncSimple failed.", e)
-            false
-        } finally {
-          writeBatchV.close()
-          writeBatchE.close()
-          writeOptions.close()
-        }
-      }
-
-      Future.successful(new MutateResponse(ret))
-    }
-  }
-
-
-  override def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext) = {
-    def op = {
-      val writeOptions = new WriteOptions
-      try {
-        val fetchedValue = db.get(requestKeyValue.row)
-        val innerRet = expectedOpt match {
-          case None =>
-            if (fetchedValue == null) {
-
-              db.put(writeOptions, requestKeyValue.row, requestKeyValue.value)
-              true
-            } else {
-              false
-            }
-          case Some(kv) =>
-            if (fetchedValue == null) {
-              false
-            } else {
-              if (Bytes.compareTo(fetchedValue, kv.value) == 0) {
-                db.put(writeOptions, requestKeyValue.row, requestKeyValue.value)
-                true
-              } else {
-                false
-              }
-            }
-        }
-
-        Future.successful(new MutateResponse(innerRet))
-      } catch {
-        case e: RocksDBException =>
-          logger.error(s"Write lock failed", e)
-          Future.successful(MutateResponse.Failure)
-      } finally {
-        writeOptions.close()
-      }
-    }
-
-    withLock(requestKeyValue.row)(op)
-  }
-
-  private def buildWriteBatch(kvs: Seq[SKeyValue]): WriteBatch = {
-    val writeBatch = new WriteBatch()
-    kvs.foreach { kv =>
-      kv.operation match {
-        case SKeyValue.Put => writeBatch.put(kv.row, kv.value)
-        case SKeyValue.Delete => writeBatch.remove(kv.row)
-        case SKeyValue.Increment => writeBatch.merge(kv.row, kv.value)
-        case _ => throw new RuntimeException(s"not supported rpc operation. ${kv.operation}")
-      }
-    }
-    writeBatch
-  }
-
-  private def withLock[A](key: Array[Byte])(op: => A): A = {
-    val lockKey = Bytes.toString(key)
-    val lock = lockMap.get(lockKey)
-
-    try {
-      lock.lock
-      op
-    } finally {
-      lock.unlock()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala
new file mode 100644
index 0000000..20acfaa
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.rocks
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.schema.ServiceColumn
+import org.apache.s2graph.core.{S2GraphLike, S2VertexLike, VertexBulkFetcher}
+import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.HBaseType
+import org.rocksdb.RocksDB
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksVertexBulkFetcher(val graph: S2GraphLike,
+                             val config: Config,
+                             val db: RocksDB,
+                             val vdb: RocksDB,
+                             val serDe: StorageSerDe,
+                             val io: StorageIO) extends VertexBulkFetcher {
+  import RocksStorage._
+
+  override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
+    import scala.collection.mutable
+
+    val vertices = new ArrayBuffer[S2VertexLike]()
+    ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case (hTableName, columns) =>
+      val distinctColumns = columns.toSet
+
+      val iter = vdb.newIterator()
+      val buffer = mutable.ListBuffer.empty[SKeyValue]
+      var oldVertexIdBytes = Array.empty[Byte]
+      var minusPos = 0
+
+      try {
+        iter.seekToFirst()
+        while (iter.isValid) {
+          val row = iter.key()
+          if (!Bytes.equals(oldVertexIdBytes, 0, oldVertexIdBytes.length - minusPos, row, 0, row.length - 1)) {
+            if (buffer.nonEmpty)
+              serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
+                .filter(v => distinctColumns(v.serviceColumn))
+                .foreach { vertex =>
+                  vertices += vertex
+                }
+
+            oldVertexIdBytes = row
+            minusPos = 1
+            buffer.clear()
+          }
+          val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, iter.value(), System.currentTimeMillis())
+          buffer += kv
+
+          iter.next()
+        }
+        if (buffer.nonEmpty)
+          serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
+            .filter(v => distinctColumns(v.serviceColumn))
+            .foreach { vertex =>
+              vertices += vertex
+            }
+
+      } finally {
+        iter.close()
+      }
+    }
+
+    Future.successful(vertices)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
new file mode 100644
index 0000000..6becd98
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.rocks
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
+import org.rocksdb.RocksDB
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksVertexFetcher(val graph: S2GraphLike,
+                         val config: Config,
+                         val db: RocksDB,
+                         val vdb: RocksDB,
+                         val serDe: StorageSerDe,
+                         val io: StorageIO) extends VertexFetcher {
+  private def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
+    val rpc = RocksStorage.buildRequest(queryRequest, vertex)
+
+    RocksStorage.fetchKeyValues(vdb, db, rpc)
+  }
+
+  override def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
+    def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
+      if (kvs.isEmpty) Nil
+      else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
+    }
+
+    val futures = vertices.map { vertex =>
+      val queryParam = QueryParam.Empty
+      val q = Query.toQuery(Seq(vertex), Seq(queryParam))
+      val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
+
+      fetchKeyValues(queryRequest, vertex).map { kvs =>
+        fromResult(kvs, vertex.serviceColumn.schemaVersion)
+      } recoverWith {
+        case ex: Throwable => Future.successful(Nil)
+      }
+    }
+
+    Future.sequence(futures).map(_.flatten)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
deleted file mode 100644
index 8cd32d4..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.s2graph.core.storage
-import org.apache.s2graph.core._
-import org.apache.s2graph.core.schema.LabelMeta
-import org.apache.s2graph.core.utils.logger
-
-import scala.concurrent.{ExecutionContext, Future}
-
-abstract class DefaultOptimisticMutator(graph: S2GraphLike,
-                                        serDe: StorageSerDe,
-                                        reader: StorageReadable) extends OptimisticMutator {
-  val fetcher = reader
-
-  lazy val io: StorageIO = new StorageIO(graph, serDe)
-  lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, this, reader)
-
-//  private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
-//    mutator.writeToStorage(cluster, kvs, withWait)
-
-  def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
-                                    requestTs: Long,
-                                    retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
-    if (stepInnerResult.isEmpty) Future.successful(true)
-    else {
-      val head = stepInnerResult.edgeWithScores.head
-      val zkQuorum = head.edge.innerLabel.hbaseZkAddr
-      val futures = for {
-        edgeWithScore <- stepInnerResult.edgeWithScores
-      } yield {
-        val edge = edgeWithScore.edge
-
-        val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
-        val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
-
-        val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
-        val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
-          serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
-            io.buildIncrementsAsync(indexEdge, -1L)
-        }
-
-        /* reverted direction */
-        val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
-        val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
-          serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
-            io.buildIncrementsAsync(indexEdge, -1L)
-        }
-
-        val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
-
-        writeToStorage(zkQuorum, mutations, withWait = true)
-      }
-
-      Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
-    }
-  }
-
-  def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
-    if (vertex.op == GraphUtil.operations("delete")) {
-      writeToStorage(zkQuorum,
-        serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
-    } else if (vertex.op == GraphUtil.operations("deleteAll")) {
-      logger.info(s"deleteAll for vertex is truncated. $vertex")
-      Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
-    } else {
-      writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
-    }
-  }
-
-  def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
-    val mutations = _edges.flatMap { edge =>
-      val (_, edgeUpdate) =
-        if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
-        else S2Edge.buildOperation(None, Seq(edge))
-
-      val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
-
-      if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
-      io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
-    }
-
-    writeToStorage(zkQuorum, mutations, withWait).map { ret =>
-      _edges.zipWithIndex.map { case (edge, idx) =>
-        idx -> ret.isSuccess
-      }
-    }
-  }
-
-  def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
-    def mutateEdgesInner(edges: Seq[S2EdgeLike],
-                         checkConsistency: Boolean,
-                         withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
-      assert(edges.nonEmpty)
-      // TODO:: remove after code review: unreachable code
-      if (!checkConsistency) {
-
-        val futures = edges.map { edge =>
-          val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
-
-          val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
-          val mutations =
-            io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
-
-          if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
-
-          writeToStorage(zkQuorum, mutations, withWait)
-        }
-        Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
-      } else {
-        fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
-          conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
-        }
-      }
-    }
-
-    val edgeWithIdxs = _edges.zipWithIndex
-    val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
-      (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
-    } toSeq
-
-    val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
-      val edges = edgeGroup.map(_._1)
-      val idxs = edgeGroup.map(_._2)
-      // After deleteAll, process others
-      val mutateEdgeFutures = edges.toList match {
-        case head :: tail =>
-          val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
-
-          //TODO: decide what we will do on failure on vertex put
-          val puts = io.buildVertexPutsAsync(head)
-          val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
-          Seq(edgeFuture, vertexFuture)
-        case Nil => Nil
-      }
-
-      val composed = for {
-        //        deleteRet <- Future.sequence(deleteAllFutures)
-        mutateRet <- Future.sequence(mutateEdgeFutures)
-      } yield mutateRet
-
-      composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
-    }
-
-    Future.sequence(mutateEdges).map { squashedRets =>
-      squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
-    }
-  }
-
-  def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
-    val futures = for {
-      edge <- edges
-    } yield {
-      val kvs = for {
-        relEdge <- edge.relatedEdges
-        edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
-      } yield {
-        val countWithTs = edge.propertyValueInner(LabelMeta.count)
-        val countVal = countWithTs.innerVal.toString().toLong
-        io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
-      }
-      writeToStorage(zkQuorum, kvs, withWait = withWait)
-    }
-
-    Future.sequence(futures)
-  }
-
-  def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
-    val kvs = io.buildDegreePuts(edge, degreeVal)
-
-    writeToStorage(zkQuorum, kvs, withWait = true)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/utils/ImportStatus.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/ImportStatus.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/ImportStatus.scala
new file mode 100644
index 0000000..aa0c6b5
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/ImportStatus.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.utils
+
+import java.util.concurrent.atomic.AtomicInteger
+
+trait ImportStatus {
+  val done: AtomicInteger
+
+  def isCompleted: Boolean
+
+  def percentage: Int
+
+  val total: Int
+}
+
+class ImportRunningStatus(val total: Int) extends ImportStatus {
+  require(total > 0, s"Total should be positive: $total")
+
+  val done = new AtomicInteger(0)
+
+  def isCompleted: Boolean = total == done.get
+
+  def percentage = 100 * done.get / total
+}
+
+case object ImportDoneStatus extends ImportStatus {
+  val total = 1
+
+  val done = new AtomicInteger(1)
+
+  def isCompleted: Boolean = true
+
+  def percentage = 100
+}
+
+object ImportStatus {
+  def apply(total: Int): ImportStatus = new ImportRunningStatus(total)
+
+  def unapply(importResult: ImportStatus): Option[(Boolean, Int, Int)] =
+    Some((importResult.isCompleted, importResult.total, importResult.done.get))
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala
new file mode 100644
index 0000000..300106a
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.utils
+
+import java.io.File
+
+import com.typesafe.config.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.s2graph.core.{EdgeFetcher, S2GraphLike}
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object Importer {
+  def toHDFSConfiguration(hdfsConfDir: String): Configuration = {
+    val conf = new Configuration
+
+    val hdfsConfDirectory = new File(hdfsConfDir)
+    if (hdfsConfDirectory.exists()) {
+      if (!hdfsConfDirectory.isDirectory || !hdfsConfDirectory.canRead) {
+        throw new IllegalStateException(s"HDFS configuration directory ($hdfsConfDirectory) cannot be read.")
+      }
+
+      val path = hdfsConfDirectory.getAbsolutePath
+      conf.addResource(new Path(s"file:///$path/core-site.xml"))
+      conf.addResource(new Path(s"file:///$path/hdfs-site.xml"))
+    } else {
+      logger.warn("RocksDBImporter doesn't have valid hadoop configuration directory..")
+    }
+    conf
+  }
+}
+
+trait Importer {
+  @volatile var isFinished: Boolean = false
+
+  def run(config: Config)(implicit ec: ExecutionContext): Future[Importer]
+
+  def status: Boolean = isFinished
+
+  def setStatus(otherStatus: Boolean): Boolean = {
+    this.isFinished = otherStatus
+    this.isFinished
+  }
+
+  def close(): Unit
+}
+
+case class IdentityImporter(graph: S2GraphLike) extends Importer {
+  override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
+    Future.successful(this)
+  }
+
+  override def close(): Unit = {}
+}
+
+object HDFSImporter {
+
+  import scala.collection.JavaConverters._
+
+  val PathsKey = "paths"
+  val HDFSConfDirKey = "hdfsConfDir"
+
+  def extractPaths(config: Config): Map[String, String] = {
+    config.getConfigList(PathsKey).asScala.map { e =>
+      val key = e.getString("src")
+      val value = e.getString("tgt")
+
+      key -> value
+    }.toMap
+  }
+}
+
+case class HDFSImporter(graph: S2GraphLike) extends Importer {
+
+  import HDFSImporter._
+
+  override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
+    Future {
+      val paths = extractPaths(config)
+      val hdfsConfiDir = config.getString(HDFSConfDirKey)
+
+      val hadoopConfig = Importer.toHDFSConfiguration(hdfsConfiDir)
+      val fs = FileSystem.get(hadoopConfig)
+
+      def copyToLocal(remoteSrc: String, localSrc: String): Unit = {
+        val remoteSrcPath = new Path(remoteSrc)
+        val localSrcPath = new Path(localSrc)
+
+        fs.copyToLocalFile(remoteSrcPath, localSrcPath)
+      }
+
+      paths.foreach { case (srcPath, tgtPath) =>
+        copyToLocal(srcPath, tgtPath)
+      }
+
+      this
+    }
+  }
+
+  //  override def status: ImportStatus = ???
+
+  override def close(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala
new file mode 100644
index 0000000..6d95c93
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.fetcher
+
+import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.core.Integrate.IntegrateCommon
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.{Query, QueryParam}
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext}
+
+class EdgeFetcherTest extends IntegrateCommon {
+
+  import scala.collection.JavaConverters._
+
+  test("MemoryModelFetcher") {
+    // 1. create label.
+    // 2. importLabel.
+    // 3. fetch.
+    val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
+    val serviceColumn =
+      management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
+    val labelName = "fetcher_test"
+    val options =
+      s"""{
+         |
+                     | "importer": {
+         |   "${FetcherManager.ClassNameKey}": "org.apache.s2graph.core.utils.IdentityImporter"
+         | },
+         | "fetcher": {
+         |   "${FetcherManager.ClassNameKey}": "org.apache.s2graph.core.fetcher.MemoryModelFetcher"
+         | }
+         |}""".stripMargin
+
+    Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) }
+
+    val label = management.createLabel(
+      labelName,
+      serviceColumn,
+      serviceColumn,
+      true,
+      service.serviceName,
+      Seq.empty[Index].asJava,
+      Seq.empty[Prop].asJava,
+      "strong",
+      null,
+      -1,
+      "v3",
+      "gz",
+      options
+    )
+    val config = ConfigFactory.parseString(options)
+    val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global)
+    Await.ready(importerFuture, Duration("60 seconds"))
+
+    Thread.sleep(1000)
+
+    val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "daewon")
+    val queryParam = QueryParam(labelName = labelName)
+
+    val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam))
+    val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds"))
+
+    stepResult.edgeWithScores.foreach { es =>
+      println(es.edge)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala
deleted file mode 100644
index 6c76cdf..0000000
--- a/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.model
-
-import com.typesafe.config.ConfigFactory
-import org.apache.s2graph.core.Integrate.IntegrateCommon
-import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.schema.Label
-import org.apache.s2graph.core.{Query, QueryParam}
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext}
-
-class FetcherTest extends IntegrateCommon {
-
-  import scala.collection.JavaConverters._
-
-  test("MemoryModelFetcher") {
-    // 1. create label.
-    // 2. importLabel.
-    // 3. fetch.
-    val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
-    val serviceColumn =
-      management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
-    val labelName = "fetcher_test"
-    val options =
-      s"""{
-         |
-                     | "importer": {
-         |   "${ModelManager.ClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter"
-         | },
-         | "fetcher": {
-         |   "${ModelManager.ClassNameKey}": "org.apache.s2graph.core.model.MemoryModelFetcher"
-         | }
-         |}""".stripMargin
-
-    Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) }
-
-    val label = management.createLabel(
-      labelName,
-      serviceColumn,
-      serviceColumn,
-      true,
-      service.serviceName,
-      Seq.empty[Index].asJava,
-      Seq.empty[Prop].asJava,
-      "strong",
-      null,
-      -1,
-      "v3",
-      "gz",
-      options
-    )
-    val config = ConfigFactory.parseString(options)
-    val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global)
-    Await.ready(importerFuture, Duration("60 seconds"))
-
-    Thread.sleep(1000)
-
-    val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "daewon")
-    val queryParam = QueryParam(labelName = labelName)
-
-    val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam))
-    val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds"))
-
-    stepResult.edgeWithScores.foreach { es =>
-      println(es.edge)
-    }
-  }
-}