You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/09/07 04:18:22 UTC

[1/2] incubator-gearpump git commit: fix GEARPUMP-205 remove hdfs dependency from gear's classpath

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 6852b56e9 -> f8f916645


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
deleted file mode 100644
index 7a60019..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.jarstore.dfs
-
-import java.io.File
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
-import org.slf4j.Logger
-
-import org.apache.gearpump.jarstore.{FilePath, JarStoreService}
-import org.apache.gearpump.util.{Constants, LogUtil}
-
-/**
- * DFSJarStoreService store the uploaded jar on HDFS
- */
-class DFSJarStoreService extends JarStoreService {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  private var rootPath: Path = null
-
-  override val scheme: String = "hdfs"
-
-  override def init(config: Config, actorRefFactory: ActorSystem): Unit = {
-    rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH))
-    val fs = rootPath.getFileSystem(new Configuration())
-    if (!fs.exists(rootPath)) {
-      fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
-    }
-  }
-
-  /**
-   * This function will copy the remote file to local file system, called from client side.
-   *
-   * @param localFile The destination of file path
-   * @param remotePath The remote file path from JarStore
-   */
-  override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
-    val filePath = new Path(rootPath, remotePath.path)
-    val fs = filePath.getFileSystem(new Configuration())
-    LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from ${filePath.toString}")
-    val target = new Path(localFile.toURI().toString)
-    fs.copyToLocalFile(filePath, target)
-  }
-
-  /**
-   * This function will copy the local file to the remote JarStore, called from client side.
-   *
-   * @param localFile The local file
-   */
-  override def copyFromLocal(localFile: File): FilePath = {
-    val remotePath = FilePath(Math.abs(new java.util.Random().nextLong()).toString)
-    val filePath = new Path(rootPath, remotePath.path)
-    val fs = filePath.getFileSystem(new Configuration())
-    LOG.info(s"Copying from local file: ${localFile.getAbsolutePath} to ${filePath.toString}")
-    fs.copyFromLocalFile(new Path(localFile.toURI.toString), filePath)
-    remotePath
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
deleted file mode 100644
index 9bd7071..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.jarstore.local
-
-import java.io.File
-
-import akka.actor.{Actor, Stash}
-import akka.pattern.pipe
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
-import org.apache.gearpump.util._
-
-/**
- * LocalJarStore store the uploaded jar on local disk.
- */
-class LocalJarStore(rootDirPath: String) extends Actor with Stash {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
-  val rootDirectory = new File(rootDirPath)
-
-  FileUtils.forceMkdir(rootDirectory)
-
-  val server = new FileServer(context.system, host, 0, rootDirectory)
-
-  implicit val timeout = Constants.FUTURE_TIMEOUT
-  implicit val executionContext = context.dispatcher
-
-  server.start pipeTo self
-
-  def receive: Receive = {
-    case FileServer.Port(port) =>
-      context.become(listen(port))
-      unstashAll()
-    case _ =>
-      stash()
-  }
-
-  def listen(port: Int): Receive = {
-    case GetJarStoreServer =>
-      sender ! JarStoreServerAddress(s"http://$host:$port/")
-  }
-
-  override def postStop(): Unit = {
-    server.stop
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala
deleted file mode 100644
index 1ab103f..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.jarstore.local
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext, Future}
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.pattern.ask
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
-import org.apache.gearpump.cluster.master.MasterProxy
-import org.apache.gearpump.jarstore.{FilePath, JarStoreService}
-import org.apache.gearpump.util._
-
-/**
- * LocalJarStoreService store the uploaded jar on local disk.
- */
-class LocalJarStoreService extends JarStoreService {
-  private def LOG: Logger = LogUtil.getLogger(getClass)
-  private implicit val timeout = Constants.FUTURE_TIMEOUT
-  private var system: akka.actor.ActorSystem = null
-  private var master: ActorRef = null
-  private implicit def dispatcher: ExecutionContext = system.dispatcher
-
-  override val scheme: String = "file"
-
-  override def init(config: Config, system: ActorSystem): Unit = {
-    this.system = system
-    val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
-      .asScala.flatMap(Util.parseHostList)
-    master = system.actorOf(MasterProxy.props(masters), s"masterproxy${Util.randInt()}")
-  }
-
-  private lazy val client = (master ? GetJarStoreServer).asInstanceOf[Future[JarStoreServerAddress]]
-    .map { address =>
-      val client = new FileServer.Client(system, address.url)
-      client
-    }
-
-  /**
-   * This function will copy the remote file to local file system, called from client side.
-   *
-   * @param localFile The destination of file path
-   * @param remotePath The remote file path from JarStore
-   */
-  override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
-    LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from $remotePath")
-    val future = client.flatMap(_.download(remotePath, localFile))
-    Await.ready(future, Duration(60, TimeUnit.SECONDS))
-  }
-
-  /**
-   * This function will copy the local file to the remote JarStore, called from client side.
-   * @param localFile The local file
-   */
-  override def copyFromLocal(localFile: File): FilePath = {
-    val future = client.flatMap(_.upload(localFile))
-    Await.result(future, Duration(60, TimeUnit.SECONDS))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala b/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala
deleted file mode 100644
index 66bb9ba..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.util
-
-import java.io.File
-import scala.concurrent.{ExecutionContext, Future}
-
-import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
-import akka.stream.Materializer
-import akka.stream.scaladsl.FileIO
-import akka.util.ByteString
-
-/**
- * FileDirective is a set of Akka-http directive to upload/download
- * huge binary files to/from Akka-Http server.
- */
-object FileDirective {
-
-  // Form field name
-  type Name = String
-
-  val CHUNK_SIZE = 262144
-
-  /**
-   * File information after a file is uploaded to server.
-   *
-   * @param originFileName original file name when user upload it in browser.
-   * @param file file name after the file is saved to server.
-   * @param length the length of the file
-   */
-  case class FileInfo(originFileName: String, file: File, length: Long)
-
-  class Form(val fields: Map[Name, FormField]) {
-    def getFile(fieldName: String): Option[FileInfo] = {
-      fields.get(fieldName).flatMap {
-        case Left(file) => Option(file)
-        case Right(_) => None
-      }
-    }
-
-    def getValue(fieldName: String): Option[String] = {
-      fields.get(fieldName).flatMap {
-        case Left(_) => None
-        case Right(value) => Option(value)
-      }
-    }
-  }
-
-  type FormField = Either[FileInfo, String]
-
-  /**
-   * directive to uploadFile, it store the uploaded files
-   * to temporary directory, and return a Map from form field name
-   * to FileInfo.
-   */
-  def uploadFile: Directive1[Form] = {
-    uploadFileTo(null)
-  }
-
-  /**
-   * Store the uploaded files to specific rootDirectory.
-   *
-   * @param rootDirectory directory to store the files.
-   * @return
-   */
-  def uploadFileTo(rootDirectory: File): Directive1[Form] = {
-    Directive[Tuple1[Form]] { inner =>
-      extractMaterializer {implicit mat =>
-        extractExecutionContext {implicit ec =>
-          uploadFileImpl(rootDirectory)(mat, ec) { filesFuture =>
-            ctx => {
-              filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx))
-            }
-          }
-        }
-      }
-    }
-  }
-
-  // Downloads file from server
-  def downloadFile(file: File): Route = {
-    val responseEntity = HttpEntity(
-      MediaTypes.`application/octet-stream`,
-      file.length,
-      FileIO.fromFile(file, CHUNK_SIZE))
-    complete(responseEntity)
-  }
-
-  private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, ec: ExecutionContext)
-    : Directive1[Future[Form]] = {
-    Directive[Tuple1[Future[Form]]] { inner =>
-      entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
-        val form = formdata.parts.mapAsync(1) { p =>
-          if (p.filename.isDefined) {
-
-            // Reserve the suffix
-            val targetPath = File.createTempFile(s"userfile_${p.name}_",
-              s"${p.filename.getOrElse("")}", rootDirectory)
-            val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
-            written.map(written =>
-              if (written.count > 0) {
-                Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count)))
-              } else {
-                Map.empty[Name, FormField]
-              })
-          } else {
-            val valueFuture = p.entity.dataBytes.runFold(ByteString.empty) {(total, input) =>
-              total ++ input
-            }
-            valueFuture.map{value =>
-              Map(p.name -> Right(value.utf8String))
-            }
-          }
-        }.runFold(new Form(Map.empty[Name, FormField])) {(set, value) =>
-          new Form(set.fields ++ value)
-        }
-
-        inner(Tuple1(form))
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala b/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala
deleted file mode 100644
index 3a0faad..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.util
-
-import java.io.File
-import scala.concurrent.{ExecutionContext, Future}
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.Http.ServerBinding
-import akka.http.scaladsl.marshalling.Marshal
-import akka.http.scaladsl.model.Uri.{Path, Query}
-import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
-import akka.http.scaladsl.unmarshalling.Unmarshal
-import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.{FileIO, Sink, Source}
-import spray.json.DefaultJsonProtocol._
-import spray.json.JsonFormat
-
-import org.apache.gearpump.jarstore.FilePath
-import org.apache.gearpump.util.FileDirective._
-import org.apache.gearpump.util.FileServer.Port
-
-/**
- * A simple file server implemented with akka-http to store/fetch large
- * binary files.
- */
-class FileServer(system: ActorSystem, host: String, port: Int = 0, rootDirectory: File) {
-  import system.dispatcher
-  implicit val actorSystem = system
-  implicit val materializer = ActorMaterializer()
-  implicit def ec: ExecutionContext = system.dispatcher
-
-  val route: Route = {
-    path("upload") {
-      uploadFileTo(rootDirectory) { form =>
-        val fileName = form.fields.headOption.flatMap { pair =>
-          val (_, fileInfo) = pair
-          fileInfo match {
-            case Left(file) => Option(file.file).map(_.getName)
-            case Right(_) => None
-          }
-        }
-
-        if (fileName.isDefined) {
-          complete(fileName.get)
-        } else {
-          failWith(new Exception("File not found in the uploaded form"))
-        }
-      }
-    } ~
-      path("download") {
-        parameters("file") { file: String =>
-          downloadFile(new File(rootDirectory, file))
-        }
-      } ~
-      pathEndOrSingleSlash {
-        extractUri { uri =>
-          val upload = uri.withPath(Uri.Path("/upload")).toString()
-          val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`,
-            s"""
-            |
-            |<h2>Please specify a file to upload:</h2>
-            |<form action="$upload" enctype="multipart/form-data" method="post">
-            |<input type="file" name="datafile" size="40">
-            |</p>
-            |<div>
-            |<input type="submit" value="Submit">
-            |</div>
-            |</form>
-          """.stripMargin)
-        complete(entity)
-      }
-    }
-  }
-
-  private var connection: Future[ServerBinding] = null
-
-  def start: Future[Port] = {
-    connection = Http().bindAndHandle(Route.handlerFlow(route), host, port)
-    connection.map(address => Port(address.localAddress.getPort))
-  }
-
-  def stop: Future[Unit] = {
-    connection.flatMap(_.unbind())
-  }
-}
-
-object FileServer {
-
-  implicit def filePathFormat: JsonFormat[FilePath] = jsonFormat1(FilePath.apply)
-
-  case class Port(port: Int)
-
-  /**
-   * Client of [[org.apache.gearpump.util.FileServer]]
-   */
-  class Client(system: ActorSystem, host: String, port: Int) {
-
-    def this(system: ActorSystem, url: String) = {
-      this(system, Uri(url).authority.host.address(), Uri(url).authority.port)
-    }
-
-    private implicit val actorSystem = system
-    private implicit val materializer = ActorMaterializer()
-    private implicit val ec = system.dispatcher
-
-    val server = Uri(s"http://$host:$port")
-    val httpClient = Http(system).outgoingConnection(server.authority.host.address(),
-      server.authority.port)
-
-    def upload(file: File): Future[FilePath] = {
-      val target = server.withPath(Path("/upload"))
-
-      val request = entity(file).map { entity =>
-        HttpRequest(HttpMethods.POST, uri = target, entity = entity)
-      }
-
-      val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head)
-      response.flatMap { some =>
-        Unmarshal(some).to[String]
-      }.map { path =>
-        FilePath(path)
-      }
-    }
-
-    def download(remoteFile: FilePath, saveAs: File): Future[Unit] = {
-      val download = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path))
-      // Download file to local
-      val response = Source.single(HttpRequest(uri = download)).via(httpClient).runWith(Sink.head)
-      val downloaded = response.flatMap { response =>
-        response.entity.dataBytes.runWith(FileIO.toFile(saveAs))
-      }
-      downloaded.map(written => Unit)
-    }
-
-    private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
-      val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
-        FileIO.fromFile(file, chunkSize = 100000))
-      val body = Source.single(
-        Multipart.FormData.BodyPart(
-          "uploadfile",
-          entity,
-          Map("filename" -> file.getName)))
-      val form = Multipart.FormData(body)
-
-      Marshal(form).to[RequestEntity]
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
----------------------------------------------------------------------
diff --git a/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
new file mode 100644
index 0000000..e173a8a
--- /dev/null
+++ b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.gearpump.jarstore.local.LocalJarStore
+org.apache.gearpump.jarstore.dfs.DFSJarStore
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
----------------------------------------------------------------------
diff --git a/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
deleted file mode 100644
index bf37316..0000000
--- a/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.gearpump.jarstore.local.LocalJarStoreService
-org.apache.gearpump.jarstore.dfs.DFSJarStoreService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala
deleted file mode 100644
index 4b17951..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.util
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.google.common.io.Files
-import org.apache.gearpump.jarstore.FilePath
-import org.apache.gearpump.util.FileServer._
-
-class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
-
-  implicit val timeout = akka.util.Timeout(25, TimeUnit.SECONDS)
-  val host = "localhost"
-  private val LOG = LogUtil.getLogger(getClass)
-
-  var system: ActorSystem = null
-
-  override def afterAll {
-    if (null != system) {
-      system.terminate()
-      Await.result(system.whenTerminated, Duration.Inf)
-    }
-  }
-
-  override def beforeAll {
-    val config = TestUtil.DEFAULT_CONFIG
-    system = ActorSystem("FileServerSpec", config)
-  }
-
-  private def save(client: Client, data: Array[Byte]): FilePath = {
-    val file = File.createTempFile("fileserverspec", "test")
-    FileUtils.writeByteArrayToFile(file, data)
-    val future = client.upload(file)
-    import scala.concurrent.duration._
-    val path = Await.result(future, 30.seconds)
-    file.delete()
-    path
-  }
-
-  private def get(client: Client, remote: FilePath): Array[Byte] = {
-    val file = File.createTempFile("fileserverspec", "test")
-    val future = client.download(remote, file)
-    import scala.concurrent.duration._
-    val data = Await.result(future, 10.seconds)
-
-    val bytes = FileUtils.readFileToByteArray(file)
-    file.delete()
-    bytes
-  }
-
-  "The file server" should {
-    "serve the data previously stored" in {
-
-      val rootDir = Files.createTempDir()
-
-      val server = new FileServer(system, host, 0, rootDir)
-      val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS))
-
-      LOG.info("start test web server on port " + port)
-
-      val sizes = List(1, 100, 1000000, 50000000)
-      val client = new Client(system, host, port.port)
-
-      sizes.foreach { size =>
-        val bytes = randomBytes(size)
-        val url = s"http://$host:${port.port}/$size"
-        val remote = save(client, bytes)
-        val fetchedBytes = get(client, remote)
-        assert(fetchedBytes sameElements bytes, s"fetch data is coruppted, $url, $rootDir")
-      }
-      server.stop
-      rootDir.delete()
-    }
-  }
-
-  "The file server" should {
-    "handle missed file" in {
-
-      val rootDir = Files.createTempDir()
-
-      val server = new FileServer(system, host, 0, rootDir)
-      val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS))
-
-      val client = new Client(system, host, port.port)
-      val fetchedBytes = get(client, FilePath("noexist"))
-      assert(fetchedBytes.length == 0)
-      rootDir.delete()
-    }
-  }
-
-  private def randomBytes(size: Int): Array[Byte] = {
-    val bytes = new Array[Byte](size)
-    (new java.util.Random()).nextBytes(bytes)
-    bytes
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 6b415bb..4ce3053 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -144,8 +144,6 @@ object Build extends sbt.Build {
     libraryDependencies ++= Seq(
       "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
       "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
-      "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion,
-      "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion,
       "commons-logging" % "commons-logging" % commonsLoggingVersion,
       "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion,
       "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided"
@@ -184,6 +182,8 @@ object Build extends sbt.Build {
       "com.typesafe.akka" %% "akka-agent" % akkaVersion,
       "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
       "com.typesafe.akka" %% "akka-kernel" % akkaVersion,
+      "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion,
+      "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion,
       "org.scala-lang" % "scala-reflect" % scalaVersionNumber,
       "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4",
       "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
@@ -310,8 +310,7 @@ object Build extends sbt.Build {
   lazy val services: Project = services_full.jvm.
     settings(serviceJvmSettings: _*)
     .settings(compile in Compile <<= (compile in Compile))
-    .dependsOn(streaming % "test->test;compile->compile",
-      daemon % "test->test;compile->compile;provided")
+    .dependsOn(streaming % "test->test;compile->compile")
 
   lazy val serviceJvmSettings = commonSettings ++ noPublish ++ Seq(
     libraryDependencies ++= Seq(
@@ -445,7 +444,7 @@ object Build extends sbt.Build {
           "org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % hadoopVersion % "provided"
         )
       ))
-      .dependsOn(services % "test->test;compile->compile", core % "provided")
+      .dependsOn(services % "test->test;compile->compile", daemon % "provided", core % "provided")
       .disablePlugins(sbtassembly.AssemblyPlugin)
 
   lazy val external_hbase = Project(

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index 4efb854..1c87653 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -29,9 +29,10 @@ object Pack extends sbt.Build {
     "${PROG_HOME}/lib/yarn/*"
   )
 
-  val applicationClassPath = daemonClassPath ++ Seq(
+  val applicationClassPath = Seq(
     // Current working directory
-    "."
+    ".",
+    "${PROG_HOME}/conf"
   )
 
   val serviceClassPath = Seq(

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
index 1ca2306..b217363 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
@@ -33,7 +33,7 @@ import org.apache.gearpump.cluster.ClientToMaster._
 import org.apache.gearpump.cluster.ClusterConfig
 import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
 import org.apache.gearpump.cluster.MasterToClient._
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective}
 import org.apache.gearpump.services.AppMasterService.Status
 // NOTE: This cannot be removed!!!
 import org.apache.gearpump.services.util.UpickleUtil._
@@ -42,14 +42,14 @@ import org.apache.gearpump.streaming.appmaster.DagManager._
 import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary
 import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
 import org.apache.gearpump.util.ActorUtil.{askActor, askAppMaster}
-import org.apache.gearpump.util.FileDirective._
+import FileDirective._
 import org.apache.gearpump.util.{Constants, Util}
 
 /**
  * Management service for AppMaster
  */
 class AppMasterService(val master: ActorRef,
-    val jarStore: JarStoreService, override val system: ActorSystem)
+    val jarStoreClient: JarStoreClient, override val system: ActorSystem)
   extends BasicService {
 
   private val systemConfig = system.settings.config
@@ -71,24 +71,24 @@ class AppMasterService(val master: ActorRef,
           val msg = java.net.URLDecoder.decode(args, "UTF-8")
           val dagOperation = read[DAGOperation](msg)
           (post & entity(as[Multipart.FormData])) { _ =>
-          uploadFile { form =>
-            val jar = form.getFile("jar").map(_.file)
+            uploadFile { form =>
+              val jar = form.getFileInfo("jar").map(_.file)
 
-            if (jar.nonEmpty) {
-              dagOperation match {
-                case replace: ReplaceProcessor =>
-                  val description = replace.newProcessorDescription.copy(jar =
-                    Util.uploadJar(jar.get, jarStore))
-                  val dagOperationWithJar = replace.copy(newProcessorDescription = description)
-                  replaceProcessor(dagOperationWithJar)
+              if (jar.nonEmpty) {
+                dagOperation match {
+                  case replace: ReplaceProcessor =>
+                    val description = replace.newProcessorDescription.copy(jar =
+                      Util.uploadJar(jar.get, jarStoreClient))
+                    val dagOperationWithJar = replace.copy(newProcessorDescription = description)
+                    replaceProcessor(dagOperationWithJar)
+                }
+              } else {
+                replaceProcessor(dagOperation)
               }
-            } else {
-              replaceProcessor(dagOperation)
             }
+          } ~ (post & entity(as[FormData])) { _ =>
+            replaceProcessor(dagOperation)
           }
-        } ~ (post & entity(as[FormData])) { _ =>
-          replaceProcessor(dagOperation)
-        }
         }
       } ~
       path("stallingtasks") {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index 32c9f08..0b8409f 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -40,19 +40,19 @@ import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, MasterConfig,
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.worker.WorkerSummary
 import org.apache.gearpump.cluster.{ClusterConfig, UserConfig}
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective, JarStoreServer}
 import org.apache.gearpump.partitioner.{PartitionerByClassName, PartitionerDescription}
 import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
 // NOTE: This cannot be removed!!!
 import org.apache.gearpump.services.util.UpickleUtil._
 import org.apache.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplication}
 import org.apache.gearpump.util.ActorUtil._
-import org.apache.gearpump.util.FileDirective._
+import FileDirective._
 import org.apache.gearpump.util.{Constants, Graph, Util}
 
 /** Manages service for master node */
 class MasterService(val master: ActorRef,
-    val jarStore: JarStoreService, override val system: ActorSystem)
+    val jarStoreClient: JarStoreClient, override val system: ActorSystem)
   extends BasicService {
 
   import upickle.default.{read, write}
@@ -116,8 +116,8 @@ class MasterService(val master: ActorRef,
     path("submitapp") {
       post {
         uploadFile { form =>
-          val jar = form.getFile("jar").map(_.file)
-          val configFile = form.getFile("configfile").map(_.file)
+          val jar = form.getFileInfo("jar").map(_.file)
+          val configFile = form.getFileInfo("configfile").map(_.file)
           val configString = form.getValue("configstring").getOrElse("")
           val executorCount = form.getValue("executorcount").getOrElse("1").toInt
           val args = form.getValue("args").getOrElse("")
@@ -139,8 +139,8 @@ class MasterService(val master: ActorRef,
     path("submitstormapp") {
       post {
         uploadFile { form =>
-          val jar = form.getFile("jar").map(_.file)
-          val configFile = form.getFile("configfile").map(_.file)
+          val jar = form.getFileInfo("jar").map(_.file)
+          val configFile = form.getFileInfo("configfile").map(_.file)
           val args = form.getValue("args").getOrElse("")
           onComplete(Future(
             MasterService.submitStormApp(jar, configFile, args, systemConfig)
@@ -180,12 +180,12 @@ class MasterService(val master: ActorRef,
     } ~
     path("uploadjar") {
       uploadFile { form =>
-        val jar = form.getFile("jar").map(_.file)
+        val jar = form.getFileInfo("jar").map(_.file)
         if (jar.isEmpty) {
           complete(write(
             MasterService.Status(success = false, reason = "Jar file not found")))
         } else {
-          val jarFile = Util.uploadJar(jar.get, jarStore)
+          val jarFile = Util.uploadJar(jar.get, jarStoreClient)
           complete(write(jarFile))
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
index 7d67f60..d92972b 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
@@ -18,8 +18,6 @@
 
 package org.apache.gearpump.services
 
-import org.apache.gearpump.jarstore.local.LocalJarStoreService
-
 import scala.concurrent.Await
 import scala.concurrent.duration._
 
@@ -31,7 +29,7 @@ import akka.stream.ActorMaterializer
 import akka.util.Timeout
 import org.apache.commons.lang.exception.ExceptionUtils
 
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.JarStoreClient
 import org.apache.gearpump.util.{Constants, LogUtil}
 // NOTE: This cannot be removed!!!
 import org.apache.gearpump.services.util.UpickleUtil._
@@ -46,16 +44,7 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem
 
   private val config = system.settings.config
 
-  // only LocalJarStoreService is supported now for "Compose DAG"
-  // since DFSJarStoreService requires HDFS to be on the classpath.
-  // Note this won't affect users  "Submit Gearpump Application" through
-  // dashboard with "jarstore.rootpath" set to HDFS.
-  if (!JarStoreService.get(config).isInstanceOf[LocalJarStoreService]) {
-    LOG.warn("only local jar store is supported for Compose DAG")
-  }
-  private val jarStoreService = new LocalJarStoreService
-  jarStoreService.init(config, system)
-
+  private val jarStoreClient = new JarStoreClient(config, system)
 
   private val securityEnabled = config.getBoolean(
     Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED)
@@ -101,9 +90,9 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem
   private def services: RouteService = {
 
     val admin = new AdminService(system)
-    val masterService = new MasterService(master, jarStoreService, system)
+    val masterService = new MasterService(master, jarStoreClient, system)
     val worker = new WorkerService(master, system)
-    val app = new AppMasterService(master, jarStoreService, system)
+    val app = new AppMasterService(master, jarStoreClient, system)
     val sup = new SupervisorService(master, supervisor, system)
 
     new RouteService {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
index 2ece554..cec7367 100644
--- a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
@@ -36,7 +36,7 @@ import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMaste
 import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
 import org.apache.gearpump.cluster.MasterToClient._
 import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
 import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
 import org.apache.gearpump.util.LogUtil
 // NOTE: This cannot be removed!!!
@@ -47,19 +47,13 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
 
   override def testConfig: Config = TestUtil.UI_CONFIG
 
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  private def actorRefFactory = system
-
   val mockAppMaster = TestProbe()
   val failure = LastFailure(System.currentTimeMillis(), "Some error")
-
-  lazy val jarStoreService = JarStoreService.get(system.settings.config)
-
-  def jarStore: JarStoreService = jarStoreService
+  val jarStoreClient = new JarStoreClient(system.settings.config, system)
 
   private def master = mockMaster.ref
 
-  private def appMasterRoute = new AppMasterService(master, jarStore, system).route
+  private def appMasterRoute = new AppMasterService(master, jarStoreClient, system).route
 
   mockAppMaster.setAutoPilot {
     new AutoPilot {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
index e365e9f..39c0de0 100644
--- a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
@@ -40,7 +40,7 @@ import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersD
 import org.apache.gearpump.cluster.MasterToClient._
 import org.apache.gearpump.cluster.TestUtil
 import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
 import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
 // NOTE: This cannot be removed!!!
 import org.apache.gearpump.services.util.UpickleUtil._
@@ -53,17 +53,13 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest
 
   override def testConfig: Config = TestUtil.UI_CONFIG
 
-  private def actorRefFactory = system
   val workerId = 0
   val mockWorker = TestProbe()
 
-  lazy val jarStoreService = JarStoreService.get(system.settings.config)
-
+  val jarStoreClient = new JarStoreClient(system.settings.config, system)
   private def master = mockMaster.ref
 
-  def jarStore: JarStoreService = jarStoreService
-
-  private def masterRoute = new MasterService(master, jarStore, system).route
+  private def masterRoute = new MasterService(master, jarStoreClient, system).route
 
   mockWorker.setAutoPilot {
     new AutoPilot {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
index 4658c98..b0e2101 100644
--- a/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
@@ -34,7 +34,6 @@ import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWor
 import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig}
 import org.apache.gearpump.cluster.TestUtil
 import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.jarstore.JarStoreService
 // NOTE: This cannot be removed!!!
 import org.apache.gearpump.services.util.UpickleUtil._
 
@@ -49,8 +48,6 @@ class WorkerServiceSpec
 
   protected def master = mockMaster.ref
 
-  lazy val jarStoreService = JarStoreService.get(system.settings.config)
-
   protected def workerRoute = new WorkerService(master, system).route
 
   mockWorker.setAutoPilot {



[2/2] incubator-gearpump git commit: fix GEARPUMP-205 remove hdfs dependency from gear's classpath

Posted by ma...@apache.org.
fix GEARPUMP-205 remove hdfs dependency from gear's classpath

raise the pr to use travis UT

Author: huafengw <fv...@gmail.com>

Closes #81 from huafengw/blob.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/f8f91664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/f8f91664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/f8f91664

Branch: refs/heads/master
Commit: f8f916645cfb9b83767c9d3c3912d04825f38636
Parents: 6852b56
Author: huafengw <fv...@gmail.com>
Authored: Wed Sep 7 12:17:52 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Sep 7 12:17:52 2016 +0800

----------------------------------------------------------------------
 .../gearpump/cluster/client/ClientContext.scala |   8 +-
 .../gearpump/cluster/main/AppSubmitter.scala    | 105 +++++++++++
 .../org/apache/gearpump/cluster/main/Gear.scala |  80 +++++++++
 .../org/apache/gearpump/cluster/main/Info.scala |  52 ++++++
 .../org/apache/gearpump/cluster/main/Kill.scala |  49 ++++++
 .../gearpump/cluster/main/MainRunner.scala      |  42 +++++
 .../apache/gearpump/cluster/main/Replay.scala   |  47 +++++
 .../gearpump/jarstore/FileDirective.scala       | 172 +++++++++++++++++++
 .../apache/gearpump/jarstore/FileServer.scala   | 160 +++++++++++++++++
 .../org/apache/gearpump/jarstore/JarStore.scala |  82 +++++++++
 .../gearpump/jarstore/JarStoreClient.scala      |  73 ++++++++
 .../gearpump/jarstore/JarStoreServer.scala      |  52 ++++++
 .../gearpump/jarstore/JarStoreService.scala     |  86 ----------
 .../gearpump/jarstore/local/LocalJarStore.scala |  72 ++++++++
 .../scala/org/apache/gearpump/util/Util.scala   |   6 +-
 .../gearpump/jarstore/FileServerSpec.scala      | 129 ++++++++++++++
 .../org.apache.gearpump.jarstore.JarStore       |  20 +++
 ...org.apache.gearpump.jarstore.JarStoreService |  20 ---
 .../gearpump/cluster/main/AppSubmitter.scala    | 106 ------------
 .../org/apache/gearpump/cluster/main/Gear.scala |  81 ---------
 .../org/apache/gearpump/cluster/main/Info.scala |  53 ------
 .../org/apache/gearpump/cluster/main/Kill.scala |  50 ------
 .../gearpump/cluster/main/MainRunner.scala      |  43 -----
 .../apache/gearpump/cluster/main/Replay.scala   |  48 ------
 .../apache/gearpump/cluster/master/Master.scala |  10 +-
 .../apache/gearpump/cluster/worker/Worker.scala |  11 +-
 .../gearpump/jarstore/dfs/DFSJarStore.scala     |  67 ++++++++
 .../jarstore/dfs/DFSJarStoreService.scala       |  76 --------
 .../gearpump/jarstore/local/LocalJarStore.scala |  64 -------
 .../jarstore/local/LocalJarStoreService.scala   |  81 ---------
 .../apache/gearpump/util/FileDirective.scala    | 140 ---------------
 .../org/apache/gearpump/util/FileServer.scala   | 167 ------------------
 .../org.apache.gearpump.jarstore.JarStore       |  20 +++
 ...org.apache.gearpump.jarstore.JarStoreService |  20 ---
 .../apache/gearpump/util/FileServerSpec.scala   | 120 -------------
 project/Build.scala                             |   9 +-
 project/Pack.scala                              |   5 +-
 .../gearpump/services/AppMasterService.scala    |  34 ++--
 .../gearpump/services/MasterService.scala       |  18 +-
 .../apache/gearpump/services/RestServices.scala |  19 +-
 .../services/AppMasterServiceSpec.scala         |  12 +-
 .../gearpump/services/MasterServiceSpec.scala   |  10 +-
 .../gearpump/services/WorkerServiceSpec.scala   |   3 -
 43 files changed, 1279 insertions(+), 1243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
index 245f1bc..0cba079 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
@@ -33,7 +33,7 @@ import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFrom
 import org.apache.gearpump.cluster.MasterToClient.ReplayApplicationResult
 import org.apache.gearpump.cluster._
 import org.apache.gearpump.cluster.master.MasterProxy
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
 import org.apache.gearpump.util.Constants._
 import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
 
@@ -59,8 +59,7 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
   LOG.info(s"Starting system ${system.name}")
   val shouldCleanupSystem = Option(sys).isEmpty
 
-  private val jarStoreService = JarStoreService.get(config)
-  jarStoreService.init(config, system)
+  private val jarStoreClient = new JarStoreClient(config, system)
 
   private lazy val master: ActorRef = {
     val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
@@ -140,8 +139,7 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
 
   private def loadFile(jarPath: String): AppJar = {
     val jarFile = new java.io.File(jarPath)
-    val path = jarStoreService.copyFromLocal(jarFile)
-    AppJar(jarFile.getName, path)
+    Util.uploadJar(jarFile, jarStoreClient)
   }
 
   private def checkAndAddNamePrefix(appName: String, namePrefix: String): String = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
new file mode 100644
index 0000000..b2eef7d
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import java.io.File
+import java.net.{URL, URLClassLoader}
+import java.util.jar.JarFile
+
+import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
+import org.slf4j.Logger
+
+/** Tool to submit an application jar to cluster */
+object AppSubmitter extends AkkaApp with ArgumentsParser {
+  val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val ignoreUnknownArgument = true
+
+  override val description = "Submit an application to Master by providing a jar"
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "namePrefix" -> CLIOption[String]("<application name prefix>", required = false,
+      defaultValue = Some("")),
+    "jar" -> CLIOption("<application>.jar", required = true),
+    "executors" -> CLIOption[Int]("number of executor to launch", required = false,
+      defaultValue = Some(1)),
+    "verbose" -> CLIOption("<print verbose log on console>", required = false,
+      defaultValue = Some(false)),
+    // For document purpose only, OPTION_CONFIG option is not used here.
+    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+
+    val config = parse(args)
+    if (null != config) {
+
+      val verbose = config.getBoolean("verbose")
+      if (verbose) {
+        LogUtil.verboseLogToConsole()
+      }
+
+      val jar = config.getString("jar")
+
+      // Set jar path to be submitted to cluster
+      System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
+      System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString)
+
+      val namePrefix = config.getString("namePrefix")
+      if (namePrefix.nonEmpty) {
+        if (!Util.validApplicationName(namePrefix)) {
+          throw new Exception(s"$namePrefix is not a valid prefix for an application name")
+        }
+        System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix)
+      }
+
+      val jarFile = new java.io.File(jar)
+
+      // Start main class
+      if (!jarFile.exists()) {
+        throw new Exception(s"jar $jar does not exist")
+      }
+
+      val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" +
+        jarFile.getAbsolutePath)), Thread.currentThread().getContextClassLoader)
+      val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader)
+
+      // Set to context classLoader. ActorSystem pick context classLoader in preference
+      Thread.currentThread().setContextClassLoader(classLoader)
+      val clazz = classLoader.loadClass(main)
+      val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+      mainMethod.invoke(null, arguments)
+    }
+  }
+
+  private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader)
+    : (String, Array[String]) = {
+    val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes.
+      getValue("Main-Class")).getOrElse("")
+
+    if (remainArgs.length > 0) {
+      classLoader.loadClass(remainArgs(0))
+      (remainArgs(0), remainArgs.drop(1))
+    } else if (mainInManifest.nonEmpty) {
+      (mainInManifest, remainArgs)
+    } else {
+      throw new Exception("No main class specified")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
new file mode 100644
index 0000000..1511469
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import org.apache.gearpump.util.{Constants, LogUtil}
+import org.slf4j.Logger
+
+object Gear {
+
+  val OPTION_CONFIG = "conf"
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  val commands = Map("app" -> AppSubmitter, "kill" -> Kill,
+    "info" -> Info, "replay" -> Replay, "main" -> MainRunner)
+
+  def usage(): Unit = {
+    val keys = commands.keys.toList.sorted
+    // scalastyle:off println
+    Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+    // scalastyle:on println
+  }
+
+  private def executeCommand(command: String, commandArgs: Array[String]) = {
+    commands.get(command).map(_.main(commandArgs))
+    if (!commands.contains(command)) {
+      val allArgs = (command +: commandArgs.toList).toArray
+      MainRunner.main(allArgs)
+    }
+  }
+
+  def main(inputArgs: Array[String]): Unit = {
+    val (configFile, args) = extractConfig(inputArgs)
+    if (configFile != null) {
+      // Sets custom config file...
+      System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile)
+    }
+
+    if (args.length == 0) {
+      usage()
+    } else {
+      val command = args(0)
+      val commandArgs = args.drop(1)
+      executeCommand(command, commandArgs)
+    }
+  }
+
+  private def extractConfig(inputArgs: Array[String]): (String, Array[String]) = {
+    var index = 0
+
+    var result = List.empty[String]
+    var configFile: String = null
+    while (index < inputArgs.length) {
+      val item = inputArgs(index)
+      if (item == s"-$OPTION_CONFIG") {
+        index += 1
+        configFile = inputArgs(index)
+      } else {
+        result = result :+ item
+      }
+      index += 1
+    }
+    (configFile, result.toArray)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
new file mode 100644
index 0000000..e1fe291
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMastersData
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+import org.slf4j.Logger
+
+/** Tool to query master info */
+object Info extends AkkaApp with ArgumentsParser {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    // For document purpose only, OPTION_CONFIG option is not used here.
+    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
+
+  override val description = "Query the Application list"
+
+  // scalastyle:off println
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+    val client = ClientContext(akkaConf)
+
+    val AppMastersData(appMasters) = client.listApps
+    Console.println("== Application Information ==")
+    Console.println("====================================")
+    appMasters.foreach { appData =>
+      Console.println(s"application: ${appData.appId}, name: ${appData.appName}, " +
+        s"status: ${appData.status}, worker: ${appData.workerPath}")
+    }
+    client.close()
+  }
+  // scalastyle:on println
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
new file mode 100644
index 0000000..8ecaf85
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+import org.slf4j.Logger
+
+/** Tool to kill an App */
+object Kill extends AkkaApp with ArgumentsParser {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "appid" -> CLIOption("<application id>", required = true),
+    // For document purpose only, OPTION_CONFIG option is not used here.
+    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
+
+  override val description = "Kill an application with application Id"
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+
+    if (null != config) {
+      val client = ClientContext(akkaConf)
+      LOG.info("Client ")
+      client.shutdown(config.getInt("appid"))
+      client.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
new file mode 100644
index 0000000..8664232
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+import org.slf4j.Logger
+
+/** Tool to run any main class by providing a jar */
+object MainRunner extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    // For document purpose only, OPTION_CONFIG option is not used here.
+    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+    val mainClazz = args(0)
+    val commandArgs = args.drop(1)
+
+    val clazz = Thread.currentThread().getContextClassLoader().loadClass(mainClazz)
+    val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+    mainMethod.invoke(null, commandArgs)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
new file mode 100644
index 0000000..e648d61
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+import org.slf4j.Logger
+
+// Internal tool to restart an application
+object Replay extends AkkaApp with ArgumentsParser {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "appid" -> CLIOption("<application id>", required = true),
+    // For document purpose only, OPTION_CONFIG option is not used here.
+    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
+
+  override val description = "Replay the application from current min clock(low watermark)"
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+
+    if (null != config) {
+      val client = ClientContext(akkaConf)
+      client.replayFromTimestampWindowTrailingEdge(config.getInt("appid"))
+      client.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala b/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala
new file mode 100644
index 0000000..969da04
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.jarstore
+
+import java.io.File
+import java.time.Instant
+import scala.concurrent.{ExecutionContext, Future}
+
+import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server._
+import akka.stream.Materializer
+import akka.stream.scaladsl.{StreamConverters, FileIO}
+import akka.util.ByteString
+
+
+/**
+ * FileDirective is a set of Akka-http directive to upload/download
+ * huge binary files to/from Akka-Http server.
+ */
+object FileDirective {
+
+  // Form field name
+  type Name = String
+
+  val CHUNK_SIZE = 262144
+
+  /**
+   * File information after a file is uploaded to server.
+   *
+   * @param originFileName original file name when user upload it in browser.
+   * @param file file name after the file is saved to server.
+   * @param length the length of the file
+   */
+  case class FileInfo(originFileName: String, file: File, length: Long)
+
+  class Form(val fields: Map[Name, FormField]) {
+    def getFileInfo(fieldName: String): Option[FileInfo] = {
+      fields.get(fieldName).flatMap {
+        case Left(file) => Option(file)
+        case Right(_) => None
+      }
+    }
+
+    def getValue(fieldName: String): Option[String] = {
+      fields.get(fieldName).flatMap {
+        case Left(_) => None
+        case Right(value) => Option(value)
+      }
+    }
+  }
+
+  type FormField = Either[FileInfo, String]
+
+  /**
+   * Store the uploaded files to temporary directory, and return a Map from form field name
+   * to FileInfo.
+   */
+  def uploadFile: Directive1[Form] = {
+    Directive[Tuple1[Form]] { inner =>
+      extractMaterializer {implicit mat =>
+        extractExecutionContext {implicit ec =>
+          uploadFileImpl(mat, ec) { formFuture =>
+            ctx => {
+              formFuture.map(form => inner(Tuple1(form))).flatMap(route => route(ctx))
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Store the uploaded files to JarStore, and return a Map from form field name
+   * to FilePath in JatStore.
+   */
+  def uploadFileTo(jarStore: JarStore): Directive1[Map[Name, FilePath]] = {
+    Directive[Tuple1[Map[Name, FilePath]]] { inner =>
+      extractMaterializer {implicit mat =>
+        extractExecutionContext {implicit ec =>
+          uploadFileImpl(jarStore)(mat, ec) { filesFuture =>
+            ctx => {
+              filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx))
+            }
+          }
+        }
+      }
+    }
+  }
+
+  // Downloads file from server
+  def downloadFileFrom(jarStore: JarStore, filePath: String): Route = {
+    val responseEntity = HttpEntity(
+      MediaTypes.`application/octet-stream`,
+      StreamConverters.fromInputStream(
+        () => jarStore.getFile(filePath), CHUNK_SIZE
+      ))
+    complete(responseEntity)
+  }
+
+  private def uploadFileImpl(jarStore: JarStore)
+    (implicit mat: Materializer, ec: ExecutionContext): Directive1[Future[Map[Name, FilePath]]] = {
+    Directive[Tuple1[Future[Map[Name, FilePath]]]] { inner =>
+      entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
+        val fileNameMap = formdata.parts.mapAsync(1) { p =>
+          if (p.filename.isDefined) {
+            val path = Instant.now().toEpochMilli + p.filename.get
+            val sink = StreamConverters.fromOutputStream(() => jarStore.createFile(path),
+              autoFlush = true)
+            p.entity.dataBytes.runWith(sink).map(written =>
+              if (written.count > 0) {
+                Map(p.name -> FilePath(path))
+              } else {
+                Map.empty[Name, FilePath]
+              })
+          } else {
+            Future(Map.empty[Name, FilePath])
+          }
+        }.runFold(Map.empty[Name, FilePath])((set, value) => set ++ value)
+        inner(Tuple1(fileNameMap))
+      }
+    }
+  }
+
+  private def uploadFileImpl(implicit mat: Materializer, ec: ExecutionContext)
+    : Directive1[Future[Form]] = {
+    Directive[Tuple1[Future[Form]]] { inner =>
+      entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
+        val form = formdata.parts.mapAsync(1) { p =>
+          if (p.filename.isDefined) {
+            val targetPath = File.createTempFile(s"userfile_${p.name}_",
+              s"${p.filename.getOrElse("")}")
+            val writtenFuture = p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
+            writtenFuture.map(written =>
+              if (written.count > 0) {
+                Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count)))
+              } else {
+                Map.empty[Name, FormField]
+              })
+          } else {
+            val valueFuture = p.entity.dataBytes.runFold(ByteString.empty) {(total, input) =>
+              total ++ input
+            }
+            valueFuture.map{value =>
+              Map(p.name -> Right(value.utf8String))
+            }
+          }
+        }.runFold(new Form(Map.empty[Name, FormField])) {(set, value) =>
+          new Form(set.fields ++ value)
+        }
+
+        inner(Tuple1(form))
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
new file mode 100644
index 0000000..4ce8f2d
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore
+
+import java.io.File
+import scala.concurrent.{ExecutionContext, Future}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.Http.ServerBinding
+import akka.http.scaladsl.marshalling.Marshal
+import akka.http.scaladsl.model.Uri.{Path, Query}
+import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server._
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{FileIO, Sink, Source}
+import spray.json.DefaultJsonProtocol._
+import spray.json.JsonFormat
+
+import org.apache.gearpump.jarstore.FileDirective._
+import org.apache.gearpump.jarstore.FileServer.Port
+
+/**
+ * A simple file server implemented with akka-http to store/fetch large
+ * binary files.
+ */
+class FileServer(system: ActorSystem, host: String, port: Int = 0, jarStore: JarStore) {
+  import system.dispatcher
+  implicit val actorSystem = system
+  implicit val materializer = ActorMaterializer()
+  implicit def ec: ExecutionContext = system.dispatcher
+
+  val route: Route = {
+    path("upload") {
+      uploadFileTo(jarStore) { form =>
+        val uploadedFilePath = form.headOption.map(_._2)
+
+        if (uploadedFilePath.isDefined) {
+          complete(uploadedFilePath.get.path)
+        } else {
+          failWith(new Exception("File not found in the uploaded form"))
+        }
+      }
+    } ~
+      path("download") {
+        parameters("file") { file: String =>
+          downloadFileFrom(jarStore, file)
+        }
+      } ~
+      pathEndOrSingleSlash {
+        extractUri { uri =>
+          val upload = uri.withPath(Uri.Path("/upload")).toString()
+          val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`,
+            s"""
+            |
+            |<h2>Please specify a file to upload:</h2>
+            |<form action="$upload" enctype="multipart/form-data" method="post">
+            |<input type="file" name="datafile" size="40">
+            |</p>
+            |<div>
+            |<input type="submit" value="Submit">
+            |</div>
+            |</form>
+          """.stripMargin)
+        complete(entity)
+      }
+    }
+  }
+
+  private var connection: Future[ServerBinding] = null
+
+  def start: Future[Port] = {
+    connection = Http().bindAndHandle(Route.handlerFlow(route), host, port)
+    connection.map(address => Port(address.localAddress.getPort))
+  }
+
+  def stop: Future[Unit] = {
+    connection.flatMap(_.unbind())
+  }
+}
+
+object FileServer {
+
+  implicit def filePathFormat: JsonFormat[FilePath] = jsonFormat1(FilePath.apply)
+
+  case class Port(port: Int)
+
+  /**
+   * Client of [[org.apache.gearpump.jarstore.FileServer]]
+   */
+  class Client(system: ActorSystem, host: String, port: Int) {
+
+    def this(system: ActorSystem, url: String) = {
+      this(system, Uri(url).authority.host.address(), Uri(url).authority.port)
+    }
+
+    private implicit val actorSystem = system
+    private implicit val materializer = ActorMaterializer()
+    private implicit val ec = system.dispatcher
+
+    val server = Uri(s"http://$host:$port")
+    val httpClient = Http(system).outgoingConnection(server.authority.host.address(),
+      server.authority.port)
+
+    def upload(file: File): Future[FilePath] = {
+      val target = server.withPath(Path("/upload"))
+
+      val request = entity(file).map { entity =>
+        HttpRequest(HttpMethods.POST, uri = target, entity = entity)
+      }
+
+      val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head)
+      response.flatMap { some =>
+        Unmarshal(some).to[String]
+      }.map { path =>
+        FilePath(path)
+      }
+    }
+
+    def download(remoteFile: FilePath, saveAs: File): Future[Unit] = {
+      val download = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path))
+      // Download file to local
+      val response = Source.single(HttpRequest(uri = download)).via(httpClient).runWith(Sink.head)
+      val downloaded = response.flatMap { response =>
+        response.entity.dataBytes.runWith(FileIO.toFile(saveAs))
+      }
+      downloaded.map(written => Unit)
+    }
+
+    private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
+      val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
+        FileIO.fromFile(file, chunkSize = 100000))
+      val body = Source.single(
+        Multipart.FormData.BodyPart(
+          "uploadfile",
+          entity,
+          Map("filename" -> file.getName)))
+      val form = Multipart.FormData(body)
+
+      Marshal(form).to[RequestEntity]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/JarStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStore.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStore.scala
new file mode 100644
index 0000000..a4a411f
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/JarStore.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.jarstore
+
+import java.io.{InputStream, OutputStream}
+import java.net.URI
+import java.util.ServiceLoader
+
+import com.typesafe.config.Config
+import org.apache.gearpump.util.Util
+
+import scala.collection.JavaConverters._
+
+case class FilePath(path: String)
+
+/**
+ * JarStore is used to manage the upload/download of binary files,
+ * like user submitted application jar.
+ */
+trait JarStore {
+  /**
+   * The scheme of the JarStore.
+   * Like "hdfs" for HDFS file system, and "file" for a local
+   * file system.
+   */
+  val scheme: String
+
+  /**
+   * Init the Jar Store.
+   */
+  def init(config: Config)
+
+  /**
+   * Creates the file on JarStore.
+   *
+   * @param fileName  name of the file to be created on JarStore.
+   * @return OutputStream returns a stream into which the data can be written.
+   */
+  def createFile(fileName: String): OutputStream
+
+  /**
+   * Gets the InputStream to read the file
+   *
+   * @param fileName name of the file to be read on JarStore.
+   * @return InputStream returns a stream from which the data can be read.
+   */
+  def getFile(fileName: String): InputStream
+}
+
+object JarStore {
+
+  /**
+   * Get a active JarStore by specifying a scheme.
+   *
+   * Please see config [[org.apache.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for
+   * more information.
+   */
+  private lazy val jarstores: List[JarStore] = {
+    ServiceLoader.load(classOf[JarStore]).asScala.toList
+  }
+
+  def get(rootPath: String): JarStore = {
+    val scheme = new URI(Util.resolvePath(rootPath)).getScheme
+    jarstores.find(_.scheme == scheme).get
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreClient.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreClient.scala
new file mode 100644
index 0000000..59cc405
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreClient.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.Await
+
+import akka.pattern.ask
+import akka.actor.{ActorSystem, ActorRef}
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.util.{Util, Constants, LogUtil}
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
+import scala.concurrent.{Future, ExecutionContext}
+
+class JarStoreClient(config: Config, system: ActorSystem) {
+  private def LOG: Logger = LogUtil.getLogger(getClass)
+  private implicit val timeout = Constants.FUTURE_TIMEOUT
+  private implicit def dispatcher: ExecutionContext = system.dispatcher
+
+  private val master: ActorRef = {
+    val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
+      .asScala.flatMap(Util.parseHostList)
+    system.actorOf(MasterProxy.props(masters), s"masterproxy${Util.randInt()}")
+  }
+
+  private lazy val client = (master ? GetJarStoreServer).asInstanceOf[Future[JarStoreServerAddress]]
+    .map { address =>
+      val client = new FileServer.Client(system, address.url)
+      client
+    }
+
+  /**
+   * This function will copy the remote file to local file system, called from client side.
+   *
+   * @param localFile The destination of file path
+   * @param remotePath The remote file path from JarStore
+   */
+  def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
+    LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from $remotePath")
+    val future = client.flatMap(_.download(remotePath, localFile))
+    Await.ready(future, Duration(60, TimeUnit.SECONDS))
+  }
+
+  /**
+   * This function will copy the local file to the remote JarStore, called from client side.
+   * @param localFile The local file
+   */
+  def copyFromLocal(localFile: File): FilePath = {
+    val future = client.flatMap(_.upload(localFile))
+    Await.result(future, Duration(60, TimeUnit.SECONDS))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreServer.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreServer.scala
new file mode 100644
index 0000000..1fb0de5
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreServer.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore
+
+import akka.actor.{Actor, Stash}
+import akka.pattern.pipe
+
+import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
+import org.apache.gearpump.util._
+
+class JarStoreServer(jarStoreRootPath: String) extends Actor with Stash {
+  private val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
+  private val jarStore = JarStore.get(jarStoreRootPath)
+  jarStore.init(context.system.settings.config)
+  private val server = new FileServer(context.system, host, 0, jarStore)
+  implicit val timeout = Constants.FUTURE_TIMEOUT
+  implicit val executionContext = context.dispatcher
+
+  server.start pipeTo self
+
+  def receive: Receive = {
+    case FileServer.Port(port) =>
+      context.become(listen(port))
+      unstashAll()
+    case _ =>
+      stash()
+  }
+
+  def listen(port: Int): Receive = {
+    case GetJarStoreServer =>
+      sender ! JarStoreServerAddress(s"http://$host:$port/")
+  }
+
+  override def postStop(): Unit = {
+    server.stop
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala
deleted file mode 100644
index 0ba9558..0000000
--- a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.jarstore
-
-import java.io.File
-import java.net.URI
-import java.util.ServiceLoader
-import scala.collection.JavaConverters._
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-
-import org.apache.gearpump.util.{Constants, Util}
-
-case class FilePath(path: String)
-
-/**
- * JarStoreService is used to manage the upload/download of binary files,
- * like user submitted application jar.
- */
-trait JarStoreService {
-  /**
-   * The scheme of the JarStoreService.
-   * Like "hdfs" for HDFS file system, and "file" for a local
-   * file system.
-   */
-  val scheme: String
-
-  /**
-   * Init the Jar Store.
-   */
-  def init(config: Config, system: ActorSystem)
-
-  /**
-   * This function will copy the local file to the remote JarStore, called from client side.
-   * @param localFile The local file
-   */
-  def copyFromLocal(localFile: File): FilePath
-
-  /**
-   * This function will copy the remote file to local file system, called from client side.
-   *
-   * @param localFile The destination of file path
-   * @param remotePath The remote file path from JarStore
-   */
-  def copyToLocalFile(localFile: File, remotePath: FilePath)
-}
-
-object JarStoreService {
-
-  /**
-   * Get a active JarStoreService by specifying a scheme.
-   *
-   * Please see config [[org.apache.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for
-   * more information.
-   */
-  def get(config: Config): JarStoreService = {
-    val jarStoreRootPath = config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
-    get(jarStoreRootPath)
-  }
-
-  private lazy val jarstoreServices: List[JarStoreService] = {
-    ServiceLoader.load(classOf[JarStoreService]).asScala.toList
-  }
-
-  private def get(rootPath: String): JarStoreService = {
-    val scheme = new URI(Util.resolvePath(rootPath)).getScheme
-    jarstoreServices.find(_.scheme == scheme).get
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala b/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
new file mode 100644
index 0000000..c15a9be
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore.local
+
+import java.io._
+
+import com.typesafe.config.Config
+import org.apache.gearpump.jarstore.JarStore
+import org.apache.gearpump.util.{LogUtil, FileUtils, Constants}
+import org.slf4j.Logger
+
+/**
+ * LocalJarStore store the uploaded jar on local disk.
+ */
+class LocalJarStore extends JarStore {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  private var rootPath: String = null
+  override val scheme: String = "file"
+
+  class ClosedInputStream extends InputStream {
+    override def read(): Int = -1
+  }
+
+  override def init(config: Config): Unit = {
+    rootPath = config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
+    FileUtils.forceMkdir(new File(rootPath))
+  }
+
+  /**
+   * Creates the file on JarStore.
+   *
+   * @param fileName  name of the file to be created on JarStore.
+   * @return OutputStream returns a stream into which the data can be written.
+   */
+  override def createFile(fileName: String): OutputStream = {
+    val localFile = new File(rootPath, fileName)
+    new FileOutputStream(localFile)
+  }
+
+  /**
+   * Gets the InputStream to read the file
+   *
+   * @param fileName name of the file to be read on JarStore.
+   * @return InputStream returns a stream from which the data can be read.
+   */
+  override def getFile(fileName: String): InputStream = {
+    val localFile = new File(rootPath, fileName)
+    val is = try {
+      new FileInputStream(localFile)
+    } catch {
+      case ex: Exception =>
+        LOG.error(s"Fetch file $fileName failed: ${ex.getStackTrace}")
+        new ClosedInputStream
+    }
+    is
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/util/Util.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Util.scala b/core/src/main/scala/org/apache/gearpump/util/Util.scala
index 19bd5a8..0faa46a 100644
--- a/core/src/main/scala/org/apache/gearpump/util/Util.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/Util.scala
@@ -27,7 +27,7 @@ import scala.util.{Failure, Success, Try}
 import com.typesafe.config.{Config, ConfigFactory}
 
 import org.apache.gearpump.cluster.AppJar
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
 import org.apache.gearpump.transport.HostPort
 
 object Util {
@@ -123,8 +123,8 @@ object Util {
     }
   }
 
-  def uploadJar(jarFile: File, jarStoreService: JarStoreService): AppJar = {
-    val remotePath = jarStoreService.copyFromLocal(jarFile)
+  def uploadJar(jarFile: File, jarStoreClient: JarStoreClient): AppJar = {
+    val remotePath = jarStoreClient.copyFromLocal(jarFile)
     AppJar(jarFile.getName, remotePath)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala b/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
new file mode 100644
index 0000000..c99a031
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.jarstore
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+
+import akka.actor.ActorSystem
+import com.typesafe.config.{ConfigValueFactory, ConfigValue}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.google.common.io.Files
+import org.apache.gearpump.jarstore.local.LocalJarStore
+import org.apache.gearpump.util.{FileUtils, LogUtil}
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import org.apache.gearpump.jarstore.FileServer._
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
+
+  implicit val timeout = akka.util.Timeout(25, TimeUnit.SECONDS)
+  val host = "localhost"
+  private val LOG = LogUtil.getLogger(getClass)
+
+  var system: ActorSystem = null
+
+  override def afterAll {
+    if (null != system) {
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
+    }
+  }
+
+  override def beforeAll {
+    system = ActorSystem("FileServerSpec", TestUtil.DEFAULT_CONFIG)
+  }
+
+  private def save(client: Client, data: Array[Byte]): FilePath = {
+    val file = File.createTempFile("fileserverspec", "test")
+    FileUtils.writeByteArrayToFile(file, data)
+    val future = client.upload(file)
+    import scala.concurrent.duration._
+    val path = Await.result(future, 30.seconds)
+    file.delete()
+    path
+  }
+
+  private def get(client: Client, remote: FilePath): Array[Byte] = {
+    val file = File.createTempFile("fileserverspec", "test")
+    val future = client.download(remote, file)
+    import scala.concurrent.duration._
+    val data = Await.result(future, 10.seconds)
+
+    val bytes = FileUtils.readFileToByteArray(file)
+    file.delete()
+    bytes
+  }
+
+  "The file server" should {
+    "serve the data previously stored" in {
+
+      val rootDir = Files.createTempDir()
+      val localJarStore: JarStore = new LocalJarStore
+      val conf = TestUtil.DEFAULT_CONFIG.withValue("gearpump.jarstore.rootpath",
+        ConfigValueFactory.fromAnyRef(rootDir.getAbsolutePath))
+      localJarStore.init(conf)
+
+      val server = new FileServer(system, host, 0, localJarStore)
+      val port = Await.result(server.start, Duration(25, TimeUnit.SECONDS))
+
+      LOG.info("start test web server on port " + port)
+
+      val sizes = List(1, 100, 1000000, 50000000)
+      val client = new Client(system, host, port.port)
+
+      sizes.foreach { size =>
+        val bytes = randomBytes(size)
+        val url = s"http://$host:${port.port}/$size"
+        val remote = save(client, bytes)
+        val fetchedBytes = get(client, remote)
+        assert(fetchedBytes sameElements bytes, s"fetch data is coruppted, $url, $rootDir")
+      }
+      server.stop
+      rootDir.delete()
+    }
+  }
+
+  "The file server" should {
+    "handle missed file" in {
+
+      val rootDir = Files.createTempDir()
+      val localJarStore: JarStore = new LocalJarStore
+      val conf = TestUtil.DEFAULT_CONFIG.withValue("gearpump.jarstore.rootpath",
+        ConfigValueFactory.fromAnyRef(rootDir.getAbsolutePath))
+      localJarStore.init(conf)
+
+      val server = new FileServer(system, host, 0, localJarStore)
+      val port = Await.result(server.start, Duration(25, TimeUnit.SECONDS))
+
+      val client = new Client(system, host, port.port)
+      val fetchedBytes = get(client, FilePath("noexist"))
+      assert(fetchedBytes.length == 0)
+      rootDir.delete()
+    }
+  }
+
+  private def randomBytes(size: Int): Array[Byte] = {
+    val bytes = new Array[Byte](size)
+    new java.util.Random().nextBytes(bytes)
+    bytes
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
----------------------------------------------------------------------
diff --git a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
new file mode 100644
index 0000000..e173a8a
--- /dev/null
+++ b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.gearpump.jarstore.local.LocalJarStore
+org.apache.gearpump.jarstore.dfs.DFSJarStore
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
----------------------------------------------------------------------
diff --git a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
deleted file mode 100644
index bf37316..0000000
--- a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.gearpump.jarstore.local.LocalJarStoreService
-org.apache.gearpump.jarstore.dfs.DFSJarStoreService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
deleted file mode 100644
index d0de51a..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.main
-
-import java.io.File
-import java.net.{URL, URLClassLoader}
-import java.util.jar.JarFile
-
-import org.slf4j.Logger
-
-import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
-
-/** Tool to submit an application jar to cluster */
-object AppSubmitter extends AkkaApp with ArgumentsParser {
-  val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val ignoreUnknownArgument = true
-
-  override val description = "Submit an application to Master by providing a jar"
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "namePrefix" -> CLIOption[String]("<application name prefix>", required = false,
-      defaultValue = Some("")),
-    "jar" -> CLIOption("<application>.jar", required = true),
-    "executors" -> CLIOption[Int]("number of executor to launch", required = false,
-      defaultValue = Some(1)),
-    "verbose" -> CLIOption("<print verbose log on console>", required = false,
-      defaultValue = Some(false)),
-    // For document purpose only, OPTION_CONFIG option is not used here.
-    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
-      defaultValue = None))
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-
-    val config = parse(args)
-    if (null != config) {
-
-      val verbose = config.getBoolean("verbose")
-      if (verbose) {
-        LogUtil.verboseLogToConsole()
-      }
-
-      val jar = config.getString("jar")
-
-      // Set jar path to be submitted to cluster
-      System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
-      System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString)
-
-      val namePrefix = config.getString("namePrefix")
-      if (namePrefix.nonEmpty) {
-        if (!Util.validApplicationName(namePrefix)) {
-          throw new Exception(s"$namePrefix is not a valid prefix for an application name")
-        }
-        System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix)
-      }
-
-      val jarFile = new java.io.File(jar)
-
-      // Start main class
-      if (!jarFile.exists()) {
-        throw new Exception(s"jar $jar does not exist")
-      }
-
-      val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" +
-        jarFile.getAbsolutePath)), Thread.currentThread().getContextClassLoader)
-      val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader)
-
-      // Set to context classLoader. ActorSystem pick context classLoader in preference
-      Thread.currentThread().setContextClassLoader(classLoader)
-      val clazz = classLoader.loadClass(main)
-      val mainMethod = clazz.getMethod("main", classOf[Array[String]])
-      mainMethod.invoke(null, arguments)
-    }
-  }
-
-  private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader)
-    : (String, Array[String]) = {
-    val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes.
-      getValue("Main-Class")).getOrElse("")
-
-    if (remainArgs.length > 0) {
-      classLoader.loadClass(remainArgs(0))
-      (remainArgs(0), remainArgs.drop(1))
-    } else if (mainInManifest.nonEmpty) {
-      (mainInManifest, remainArgs)
-    } else {
-      throw new Exception("No main class specified")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
deleted file mode 100644
index 672fee6..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import org.apache.gearpump.util.{Constants, LogUtil}
-
-object Gear {
-
-  val OPTION_CONFIG = "conf"
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  val commands = Map("app" -> AppSubmitter, "kill" -> Kill,
-    "info" -> Info, "replay" -> Replay, "main" -> MainRunner)
-
-  def usage(): Unit = {
-    val keys = commands.keys.toList.sorted
-    // scalastyle:off println
-    Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
-    // scalastyle:on println
-  }
-
-  private def executeCommand(command: String, commandArgs: Array[String]) = {
-    commands.get(command).map(_.main(commandArgs))
-    if (!commands.contains(command)) {
-      val allArgs = (command +: commandArgs.toList).toArray
-      MainRunner.main(allArgs)
-    }
-  }
-
-  def main(inputArgs: Array[String]): Unit = {
-    val (configFile, args) = extractConfig(inputArgs)
-    if (configFile != null) {
-      // Sets custom config file...
-      System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile)
-    }
-
-    if (args.length == 0) {
-      usage()
-    } else {
-      val command = args(0)
-      val commandArgs = args.drop(1)
-      executeCommand(command, commandArgs)
-    }
-  }
-
-  private def extractConfig(inputArgs: Array[String]): (String, Array[String]) = {
-    var index = 0
-
-    var result = List.empty[String]
-    var configFile: String = null
-    while (index < inputArgs.length) {
-      val item = inputArgs(index)
-      if (item == s"-$OPTION_CONFIG") {
-        index += 1
-        configFile = inputArgs(index)
-      } else {
-        result = result :+ item
-      }
-      index += 1
-    }
-    (configFile, result.toArray)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
deleted file mode 100644
index bf444a3..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.MasterToAppMaster.AppMastersData
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to query master info */
-object Info extends AkkaApp with ArgumentsParser {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    // For document purpose only, OPTION_CONFIG option is not used here.
-    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
-      defaultValue = None))
-
-  override val description = "Query the Application list"
-
-  // scalastyle:off println
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-    val client = ClientContext(akkaConf)
-
-    val AppMastersData(appMasters) = client.listApps
-    Console.println("== Application Information ==")
-    Console.println("====================================")
-    appMasters.foreach { appData =>
-      Console.println(s"application: ${appData.appId}, name: ${appData.appName}, " +
-        s"status: ${appData.status}, worker: ${appData.workerPath}")
-    }
-    client.close()
-  }
-  // scalastyle:on println
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
deleted file mode 100644
index 17f6214..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to kill an App */
-object Kill extends AkkaApp with ArgumentsParser {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "appid" -> CLIOption("<application id>", required = true),
-    // For document purpose only, OPTION_CONFIG option is not used here.
-    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
-      defaultValue = None))
-
-  override val description = "Kill an application with application Id"
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-
-    if (null != config) {
-      val client = ClientContext(akkaConf)
-      LOG.info("Client ")
-      client.shutdown(config.getInt("appid"))
-      client.close()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
deleted file mode 100644
index c6c9f10..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to run any main class by providing a jar */
-object MainRunner extends AkkaApp with ArgumentsParser {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    // For document purpose only, OPTION_CONFIG option is not used here.
-    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
-      defaultValue = None))
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-    val mainClazz = args(0)
-    val commandArgs = args.drop(1)
-
-    val clazz = Thread.currentThread().getContextClassLoader().loadClass(mainClazz)
-    val mainMethod = clazz.getMethod("main", classOf[Array[String]])
-    mainMethod.invoke(null, commandArgs)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
deleted file mode 100644
index d721832..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-
-// Internal tool to restart an application
-object Replay extends AkkaApp with ArgumentsParser {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "appid" -> CLIOption("<application id>", required = true),
-    // For document purpose only, OPTION_CONFIG option is not used here.
-    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
-      defaultValue = None))
-
-  override val description = "Replay the application from current min clock(low watermark)"
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-
-    if (null != config) {
-      val client = ClientContext(akkaConf)
-      client.replayFromTimestampWindowTrailingEdge(config.getInt("appid"))
-      client.close()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
index 762cf27..6b4df07 100644
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
@@ -20,6 +20,7 @@ package org.apache.gearpump.cluster.master
 
 import java.lang.management.ManagementFactory
 import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.jarstore.JarStoreServer
 
 import scala.collection.JavaConverters._
 import scala.collection.immutable
@@ -40,7 +41,6 @@ import org.apache.gearpump.cluster.WorkerToMaster._
 import org.apache.gearpump.cluster.master.InMemoryKVService._
 import org.apache.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _}
 import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-import org.apache.gearpump.jarstore.local.LocalJarStore
 import org.apache.gearpump.metrics.Metrics.ReportMetrics
 import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
 import org.apache.gearpump.transport.HostPort
@@ -79,11 +79,7 @@ private[cluster] class Master extends Actor with Stash {
 
   val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
 
-  private val jarStore = if (Util.isLocalPath(jarStoreRootPath)) {
-    Some(context.actorOf(Props(classOf[LocalJarStore], jarStoreRootPath)))
-  } else {
-    None
-  }
+  private val jarStore = context.actorOf(Props(classOf[JarStoreServer], jarStoreRootPath))
 
   private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort)
 
@@ -162,7 +158,7 @@ private[cluster] class Master extends Actor with Stash {
 
   def jarStoreService: Receive = {
     case GetJarStoreServer =>
-      jarStore.foreach(_ forward GetJarStoreServer)
+      jarStore forward GetJarStoreServer
   }
 
   def kvServiceMsgHandler: Receive = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
index ff74368..1b52e5d 100644
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
@@ -43,7 +43,7 @@ import org.apache.gearpump.cluster.WorkerToMaster._
 import org.apache.gearpump.cluster.master.Master.MasterInfo
 import org.apache.gearpump.cluster.scheduler.Resource
 import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
 import org.apache.gearpump.metrics.Metrics.ReportMetrics
 import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
 import org.apache.gearpump.util.ActorSystemBooter.Daemon
@@ -69,8 +69,7 @@ private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutS
   private var masterInfo: MasterInfo = null
   private var executorNameToActor = Map.empty[String, ActorRef]
   private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher()
-  private val jarStoreService = JarStoreService.get(systemConfig)
-  jarStoreService.init(systemConfig, context.system)
+  private val jarStoreClient = new JarStoreClient(systemConfig, context.system)
 
   private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
   private val resourceUpdateTimeoutMs = 30000 // Milliseconds
@@ -171,7 +170,7 @@ private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutS
         val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
 
         val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
-          jarStoreService, executorProcLauncher))
+          jarStoreClient, executorProcLauncher))
         executorNameToActor += actorName -> executor
 
         resource = resource - launch.resource
@@ -339,7 +338,7 @@ private[cluster] object Worker {
       launch: LaunchExecutor,
       masterInfo: MasterInfo,
       ioPool: ExecutionContext,
-      jarStoreService: JarStoreService,
+      jarStoreClient: JarStoreClient,
       procLauncher: ExecutorProcessLauncher) extends Actor {
     import launch.{appId, executorId, resource}
 
@@ -407,7 +406,7 @@ private[cluster] object Worker {
       val process = Future {
         val jarPath = ctx.jar.map { appJar =>
           val tempFile = File.createTempFile(appJar.name, ".jar")
-          jarStoreService.copyToLocalFile(tempFile, appJar.filePath)
+          jarStoreClient.copyToLocalFile(tempFile, appJar.filePath)
           val file = new URL("file:" + tempFile)
           file.getFile
         }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala
new file mode 100644
index 0000000..ebaf354
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore.dfs
+
+import java.io.{InputStream, OutputStream}
+
+import com.typesafe.config.Config
+import org.apache.gearpump.jarstore.JarStore
+import org.apache.gearpump.util.Constants
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
+
+/**
+ * DFSJarStore store the uploaded jar on HDFS
+ */
+class DFSJarStore extends JarStore {
+  private var rootPath: Path = null
+  override val scheme: String = "hdfs"
+
+  override def init(config: Config): Unit = {
+    rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH))
+    val fs = rootPath.getFileSystem(new Configuration())
+    if (!fs.exists(rootPath)) {
+      fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+    }
+  }
+
+  /**
+   * Creates the file on JarStore.
+   *
+   * @param fileName  name of the file to be created on JarStore.
+   * @return OutputStream returns a stream into which the data can be written.
+   */
+  override def createFile(fileName: String): OutputStream = {
+    val filePath = new Path(rootPath, fileName)
+    val fs = filePath.getFileSystem(new Configuration())
+    fs.create(filePath)
+  }
+
+  /**
+   * Gets the InputStream to read the file
+   *
+   * @param fileName name of the file to be read on JarStore.
+   * @return InputStream returns a stream from which the data can be read.
+   */
+  override def getFile(fileName: String): InputStream = {
+    val filePath = new Path(rootPath, fileName)
+    val fs = filePath.getFileSystem(new Configuration())
+    fs.open(filePath)
+  }
+}