You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/09/07 04:18:22 UTC
[1/2] incubator-gearpump git commit: fix GEARPUMP-205 remove hdfs
dependency from gear's classpath
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 6852b56e9 -> f8f916645
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
deleted file mode 100644
index 7a60019..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.jarstore.dfs
-
-import java.io.File
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
-import org.slf4j.Logger
-
-import org.apache.gearpump.jarstore.{FilePath, JarStoreService}
-import org.apache.gearpump.util.{Constants, LogUtil}
-
-/**
- * DFSJarStoreService store the uploaded jar on HDFS
- */
-class DFSJarStoreService extends JarStoreService {
- private val LOG: Logger = LogUtil.getLogger(getClass)
- private var rootPath: Path = null
-
- override val scheme: String = "hdfs"
-
- override def init(config: Config, actorRefFactory: ActorSystem): Unit = {
- rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH))
- val fs = rootPath.getFileSystem(new Configuration())
- if (!fs.exists(rootPath)) {
- fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
- }
- }
-
- /**
- * This function will copy the remote file to local file system, called from client side.
- *
- * @param localFile The destination of file path
- * @param remotePath The remote file path from JarStore
- */
- override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
- val filePath = new Path(rootPath, remotePath.path)
- val fs = filePath.getFileSystem(new Configuration())
- LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from ${filePath.toString}")
- val target = new Path(localFile.toURI().toString)
- fs.copyToLocalFile(filePath, target)
- }
-
- /**
- * This function will copy the local file to the remote JarStore, called from client side.
- *
- * @param localFile The local file
- */
- override def copyFromLocal(localFile: File): FilePath = {
- val remotePath = FilePath(Math.abs(new java.util.Random().nextLong()).toString)
- val filePath = new Path(rootPath, remotePath.path)
- val fs = filePath.getFileSystem(new Configuration())
- LOG.info(s"Copying from local file: ${localFile.getAbsolutePath} to ${filePath.toString}")
- fs.copyFromLocalFile(new Path(localFile.toURI.toString), filePath)
- remotePath
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
deleted file mode 100644
index 9bd7071..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.jarstore.local
-
-import java.io.File
-
-import akka.actor.{Actor, Stash}
-import akka.pattern.pipe
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
-import org.apache.gearpump.util._
-
-/**
- * LocalJarStore store the uploaded jar on local disk.
- */
-class LocalJarStore(rootDirPath: String) extends Actor with Stash {
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
- val rootDirectory = new File(rootDirPath)
-
- FileUtils.forceMkdir(rootDirectory)
-
- val server = new FileServer(context.system, host, 0, rootDirectory)
-
- implicit val timeout = Constants.FUTURE_TIMEOUT
- implicit val executionContext = context.dispatcher
-
- server.start pipeTo self
-
- def receive: Receive = {
- case FileServer.Port(port) =>
- context.become(listen(port))
- unstashAll()
- case _ =>
- stash()
- }
-
- def listen(port: Int): Receive = {
- case GetJarStoreServer =>
- sender ! JarStoreServerAddress(s"http://$host:$port/")
- }
-
- override def postStop(): Unit = {
- server.stop
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala
deleted file mode 100644
index 1ab103f..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.jarstore.local
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext, Future}
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.pattern.ask
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
-import org.apache.gearpump.cluster.master.MasterProxy
-import org.apache.gearpump.jarstore.{FilePath, JarStoreService}
-import org.apache.gearpump.util._
-
-/**
- * LocalJarStoreService store the uploaded jar on local disk.
- */
-class LocalJarStoreService extends JarStoreService {
- private def LOG: Logger = LogUtil.getLogger(getClass)
- private implicit val timeout = Constants.FUTURE_TIMEOUT
- private var system: akka.actor.ActorSystem = null
- private var master: ActorRef = null
- private implicit def dispatcher: ExecutionContext = system.dispatcher
-
- override val scheme: String = "file"
-
- override def init(config: Config, system: ActorSystem): Unit = {
- this.system = system
- val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
- .asScala.flatMap(Util.parseHostList)
- master = system.actorOf(MasterProxy.props(masters), s"masterproxy${Util.randInt()}")
- }
-
- private lazy val client = (master ? GetJarStoreServer).asInstanceOf[Future[JarStoreServerAddress]]
- .map { address =>
- val client = new FileServer.Client(system, address.url)
- client
- }
-
- /**
- * This function will copy the remote file to local file system, called from client side.
- *
- * @param localFile The destination of file path
- * @param remotePath The remote file path from JarStore
- */
- override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
- LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from $remotePath")
- val future = client.flatMap(_.download(remotePath, localFile))
- Await.ready(future, Duration(60, TimeUnit.SECONDS))
- }
-
- /**
- * This function will copy the local file to the remote JarStore, called from client side.
- * @param localFile The local file
- */
- override def copyFromLocal(localFile: File): FilePath = {
- val future = client.flatMap(_.upload(localFile))
- Await.result(future, Duration(60, TimeUnit.SECONDS))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala b/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala
deleted file mode 100644
index 66bb9ba..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.util
-
-import java.io.File
-import scala.concurrent.{ExecutionContext, Future}
-
-import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
-import akka.stream.Materializer
-import akka.stream.scaladsl.FileIO
-import akka.util.ByteString
-
-/**
- * FileDirective is a set of Akka-http directive to upload/download
- * huge binary files to/from Akka-Http server.
- */
-object FileDirective {
-
- // Form field name
- type Name = String
-
- val CHUNK_SIZE = 262144
-
- /**
- * File information after a file is uploaded to server.
- *
- * @param originFileName original file name when user upload it in browser.
- * @param file file name after the file is saved to server.
- * @param length the length of the file
- */
- case class FileInfo(originFileName: String, file: File, length: Long)
-
- class Form(val fields: Map[Name, FormField]) {
- def getFile(fieldName: String): Option[FileInfo] = {
- fields.get(fieldName).flatMap {
- case Left(file) => Option(file)
- case Right(_) => None
- }
- }
-
- def getValue(fieldName: String): Option[String] = {
- fields.get(fieldName).flatMap {
- case Left(_) => None
- case Right(value) => Option(value)
- }
- }
- }
-
- type FormField = Either[FileInfo, String]
-
- /**
- * directive to uploadFile, it store the uploaded files
- * to temporary directory, and return a Map from form field name
- * to FileInfo.
- */
- def uploadFile: Directive1[Form] = {
- uploadFileTo(null)
- }
-
- /**
- * Store the uploaded files to specific rootDirectory.
- *
- * @param rootDirectory directory to store the files.
- * @return
- */
- def uploadFileTo(rootDirectory: File): Directive1[Form] = {
- Directive[Tuple1[Form]] { inner =>
- extractMaterializer {implicit mat =>
- extractExecutionContext {implicit ec =>
- uploadFileImpl(rootDirectory)(mat, ec) { filesFuture =>
- ctx => {
- filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx))
- }
- }
- }
- }
- }
- }
-
- // Downloads file from server
- def downloadFile(file: File): Route = {
- val responseEntity = HttpEntity(
- MediaTypes.`application/octet-stream`,
- file.length,
- FileIO.fromFile(file, CHUNK_SIZE))
- complete(responseEntity)
- }
-
- private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, ec: ExecutionContext)
- : Directive1[Future[Form]] = {
- Directive[Tuple1[Future[Form]]] { inner =>
- entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
- val form = formdata.parts.mapAsync(1) { p =>
- if (p.filename.isDefined) {
-
- // Reserve the suffix
- val targetPath = File.createTempFile(s"userfile_${p.name}_",
- s"${p.filename.getOrElse("")}", rootDirectory)
- val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
- written.map(written =>
- if (written.count > 0) {
- Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count)))
- } else {
- Map.empty[Name, FormField]
- })
- } else {
- val valueFuture = p.entity.dataBytes.runFold(ByteString.empty) {(total, input) =>
- total ++ input
- }
- valueFuture.map{value =>
- Map(p.name -> Right(value.utf8String))
- }
- }
- }.runFold(new Form(Map.empty[Name, FormField])) {(set, value) =>
- new Form(set.fields ++ value)
- }
-
- inner(Tuple1(form))
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala b/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala
deleted file mode 100644
index 3a0faad..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.util
-
-import java.io.File
-import scala.concurrent.{ExecutionContext, Future}
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.Http.ServerBinding
-import akka.http.scaladsl.marshalling.Marshal
-import akka.http.scaladsl.model.Uri.{Path, Query}
-import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
-import akka.http.scaladsl.unmarshalling.Unmarshal
-import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.{FileIO, Sink, Source}
-import spray.json.DefaultJsonProtocol._
-import spray.json.JsonFormat
-
-import org.apache.gearpump.jarstore.FilePath
-import org.apache.gearpump.util.FileDirective._
-import org.apache.gearpump.util.FileServer.Port
-
-/**
- * A simple file server implemented with akka-http to store/fetch large
- * binary files.
- */
-class FileServer(system: ActorSystem, host: String, port: Int = 0, rootDirectory: File) {
- import system.dispatcher
- implicit val actorSystem = system
- implicit val materializer = ActorMaterializer()
- implicit def ec: ExecutionContext = system.dispatcher
-
- val route: Route = {
- path("upload") {
- uploadFileTo(rootDirectory) { form =>
- val fileName = form.fields.headOption.flatMap { pair =>
- val (_, fileInfo) = pair
- fileInfo match {
- case Left(file) => Option(file.file).map(_.getName)
- case Right(_) => None
- }
- }
-
- if (fileName.isDefined) {
- complete(fileName.get)
- } else {
- failWith(new Exception("File not found in the uploaded form"))
- }
- }
- } ~
- path("download") {
- parameters("file") { file: String =>
- downloadFile(new File(rootDirectory, file))
- }
- } ~
- pathEndOrSingleSlash {
- extractUri { uri =>
- val upload = uri.withPath(Uri.Path("/upload")).toString()
- val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`,
- s"""
- |
- |<h2>Please specify a file to upload:</h2>
- |<form action="$upload" enctype="multipart/form-data" method="post">
- |<input type="file" name="datafile" size="40">
- |</p>
- |<div>
- |<input type="submit" value="Submit">
- |</div>
- |</form>
- """.stripMargin)
- complete(entity)
- }
- }
- }
-
- private var connection: Future[ServerBinding] = null
-
- def start: Future[Port] = {
- connection = Http().bindAndHandle(Route.handlerFlow(route), host, port)
- connection.map(address => Port(address.localAddress.getPort))
- }
-
- def stop: Future[Unit] = {
- connection.flatMap(_.unbind())
- }
-}
-
-object FileServer {
-
- implicit def filePathFormat: JsonFormat[FilePath] = jsonFormat1(FilePath.apply)
-
- case class Port(port: Int)
-
- /**
- * Client of [[org.apache.gearpump.util.FileServer]]
- */
- class Client(system: ActorSystem, host: String, port: Int) {
-
- def this(system: ActorSystem, url: String) = {
- this(system, Uri(url).authority.host.address(), Uri(url).authority.port)
- }
-
- private implicit val actorSystem = system
- private implicit val materializer = ActorMaterializer()
- private implicit val ec = system.dispatcher
-
- val server = Uri(s"http://$host:$port")
- val httpClient = Http(system).outgoingConnection(server.authority.host.address(),
- server.authority.port)
-
- def upload(file: File): Future[FilePath] = {
- val target = server.withPath(Path("/upload"))
-
- val request = entity(file).map { entity =>
- HttpRequest(HttpMethods.POST, uri = target, entity = entity)
- }
-
- val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head)
- response.flatMap { some =>
- Unmarshal(some).to[String]
- }.map { path =>
- FilePath(path)
- }
- }
-
- def download(remoteFile: FilePath, saveAs: File): Future[Unit] = {
- val download = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path))
- // Download file to local
- val response = Source.single(HttpRequest(uri = download)).via(httpClient).runWith(Sink.head)
- val downloaded = response.flatMap { response =>
- response.entity.dataBytes.runWith(FileIO.toFile(saveAs))
- }
- downloaded.map(written => Unit)
- }
-
- private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
- val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
- FileIO.fromFile(file, chunkSize = 100000))
- val body = Source.single(
- Multipart.FormData.BodyPart(
- "uploadfile",
- entity,
- Map("filename" -> file.getName)))
- val form = Multipart.FormData(body)
-
- Marshal(form).to[RequestEntity]
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
----------------------------------------------------------------------
diff --git a/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
new file mode 100644
index 0000000..e173a8a
--- /dev/null
+++ b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.gearpump.jarstore.local.LocalJarStore
+org.apache.gearpump.jarstore.dfs.DFSJarStore
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
----------------------------------------------------------------------
diff --git a/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
deleted file mode 100644
index bf37316..0000000
--- a/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.gearpump.jarstore.local.LocalJarStoreService
-org.apache.gearpump.jarstore.dfs.DFSJarStoreService
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala
deleted file mode 100644
index 4b17951..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.util
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.google.common.io.Files
-import org.apache.gearpump.jarstore.FilePath
-import org.apache.gearpump.util.FileServer._
-
-class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
-
- implicit val timeout = akka.util.Timeout(25, TimeUnit.SECONDS)
- val host = "localhost"
- private val LOG = LogUtil.getLogger(getClass)
-
- var system: ActorSystem = null
-
- override def afterAll {
- if (null != system) {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
- }
-
- override def beforeAll {
- val config = TestUtil.DEFAULT_CONFIG
- system = ActorSystem("FileServerSpec", config)
- }
-
- private def save(client: Client, data: Array[Byte]): FilePath = {
- val file = File.createTempFile("fileserverspec", "test")
- FileUtils.writeByteArrayToFile(file, data)
- val future = client.upload(file)
- import scala.concurrent.duration._
- val path = Await.result(future, 30.seconds)
- file.delete()
- path
- }
-
- private def get(client: Client, remote: FilePath): Array[Byte] = {
- val file = File.createTempFile("fileserverspec", "test")
- val future = client.download(remote, file)
- import scala.concurrent.duration._
- val data = Await.result(future, 10.seconds)
-
- val bytes = FileUtils.readFileToByteArray(file)
- file.delete()
- bytes
- }
-
- "The file server" should {
- "serve the data previously stored" in {
-
- val rootDir = Files.createTempDir()
-
- val server = new FileServer(system, host, 0, rootDir)
- val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS))
-
- LOG.info("start test web server on port " + port)
-
- val sizes = List(1, 100, 1000000, 50000000)
- val client = new Client(system, host, port.port)
-
- sizes.foreach { size =>
- val bytes = randomBytes(size)
- val url = s"http://$host:${port.port}/$size"
- val remote = save(client, bytes)
- val fetchedBytes = get(client, remote)
- assert(fetchedBytes sameElements bytes, s"fetch data is coruppted, $url, $rootDir")
- }
- server.stop
- rootDir.delete()
- }
- }
-
- "The file server" should {
- "handle missed file" in {
-
- val rootDir = Files.createTempDir()
-
- val server = new FileServer(system, host, 0, rootDir)
- val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS))
-
- val client = new Client(system, host, port.port)
- val fetchedBytes = get(client, FilePath("noexist"))
- assert(fetchedBytes.length == 0)
- rootDir.delete()
- }
- }
-
- private def randomBytes(size: Int): Array[Byte] = {
- val bytes = new Array[Byte](size)
- (new java.util.Random()).nextBytes(bytes)
- bytes
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 6b415bb..4ce3053 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -144,8 +144,6 @@ object Build extends sbt.Build {
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
- "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion,
- "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion,
"commons-logging" % "commons-logging" % commonsLoggingVersion,
"com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided"
@@ -184,6 +182,8 @@ object Build extends sbt.Build {
"com.typesafe.akka" %% "akka-agent" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-kernel" % akkaVersion,
+ "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion,
+ "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion,
"org.scala-lang" % "scala-reflect" % scalaVersionNumber,
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4",
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
@@ -310,8 +310,7 @@ object Build extends sbt.Build {
lazy val services: Project = services_full.jvm.
settings(serviceJvmSettings: _*)
.settings(compile in Compile <<= (compile in Compile))
- .dependsOn(streaming % "test->test;compile->compile",
- daemon % "test->test;compile->compile;provided")
+ .dependsOn(streaming % "test->test;compile->compile")
lazy val serviceJvmSettings = commonSettings ++ noPublish ++ Seq(
libraryDependencies ++= Seq(
@@ -445,7 +444,7 @@ object Build extends sbt.Build {
"org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % hadoopVersion % "provided"
)
))
- .dependsOn(services % "test->test;compile->compile", core % "provided")
+ .dependsOn(services % "test->test;compile->compile", daemon % "provided", core % "provided")
.disablePlugins(sbtassembly.AssemblyPlugin)
lazy val external_hbase = Project(
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index 4efb854..1c87653 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -29,9 +29,10 @@ object Pack extends sbt.Build {
"${PROG_HOME}/lib/yarn/*"
)
- val applicationClassPath = daemonClassPath ++ Seq(
+ val applicationClassPath = Seq(
// Current working directory
- "."
+ ".",
+ "${PROG_HOME}/conf"
)
val serviceClassPath = Seq(
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
index 1ca2306..b217363 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
@@ -33,7 +33,7 @@ import org.apache.gearpump.cluster.ClientToMaster._
import org.apache.gearpump.cluster.ClusterConfig
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
import org.apache.gearpump.cluster.MasterToClient._
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective}
import org.apache.gearpump.services.AppMasterService.Status
// NOTE: This cannot be removed!!!
import org.apache.gearpump.services.util.UpickleUtil._
@@ -42,14 +42,14 @@ import org.apache.gearpump.streaming.appmaster.DagManager._
import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary
import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
import org.apache.gearpump.util.ActorUtil.{askActor, askAppMaster}
-import org.apache.gearpump.util.FileDirective._
+import FileDirective._
import org.apache.gearpump.util.{Constants, Util}
/**
* Management service for AppMaster
*/
class AppMasterService(val master: ActorRef,
- val jarStore: JarStoreService, override val system: ActorSystem)
+ val jarStoreClient: JarStoreClient, override val system: ActorSystem)
extends BasicService {
private val systemConfig = system.settings.config
@@ -71,24 +71,24 @@ class AppMasterService(val master: ActorRef,
val msg = java.net.URLDecoder.decode(args, "UTF-8")
val dagOperation = read[DAGOperation](msg)
(post & entity(as[Multipart.FormData])) { _ =>
- uploadFile { form =>
- val jar = form.getFile("jar").map(_.file)
+ uploadFile { form =>
+ val jar = form.getFileInfo("jar").map(_.file)
- if (jar.nonEmpty) {
- dagOperation match {
- case replace: ReplaceProcessor =>
- val description = replace.newProcessorDescription.copy(jar =
- Util.uploadJar(jar.get, jarStore))
- val dagOperationWithJar = replace.copy(newProcessorDescription = description)
- replaceProcessor(dagOperationWithJar)
+ if (jar.nonEmpty) {
+ dagOperation match {
+ case replace: ReplaceProcessor =>
+ val description = replace.newProcessorDescription.copy(jar =
+ Util.uploadJar(jar.get, jarStoreClient))
+ val dagOperationWithJar = replace.copy(newProcessorDescription = description)
+ replaceProcessor(dagOperationWithJar)
+ }
+ } else {
+ replaceProcessor(dagOperation)
}
- } else {
- replaceProcessor(dagOperation)
}
+ } ~ (post & entity(as[FormData])) { _ =>
+ replaceProcessor(dagOperation)
}
- } ~ (post & entity(as[FormData])) { _ =>
- replaceProcessor(dagOperation)
- }
}
} ~
path("stallingtasks") {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index 32c9f08..0b8409f 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -40,19 +40,19 @@ import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, MasterConfig,
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.worker.WorkerSummary
import org.apache.gearpump.cluster.{ClusterConfig, UserConfig}
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective, JarStoreServer}
import org.apache.gearpump.partitioner.{PartitionerByClassName, PartitionerDescription}
import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
// NOTE: This cannot be removed!!!
import org.apache.gearpump.services.util.UpickleUtil._
import org.apache.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplication}
import org.apache.gearpump.util.ActorUtil._
-import org.apache.gearpump.util.FileDirective._
+import FileDirective._
import org.apache.gearpump.util.{Constants, Graph, Util}
/** Manages service for master node */
class MasterService(val master: ActorRef,
- val jarStore: JarStoreService, override val system: ActorSystem)
+ val jarStoreClient: JarStoreClient, override val system: ActorSystem)
extends BasicService {
import upickle.default.{read, write}
@@ -116,8 +116,8 @@ class MasterService(val master: ActorRef,
path("submitapp") {
post {
uploadFile { form =>
- val jar = form.getFile("jar").map(_.file)
- val configFile = form.getFile("configfile").map(_.file)
+ val jar = form.getFileInfo("jar").map(_.file)
+ val configFile = form.getFileInfo("configfile").map(_.file)
val configString = form.getValue("configstring").getOrElse("")
val executorCount = form.getValue("executorcount").getOrElse("1").toInt
val args = form.getValue("args").getOrElse("")
@@ -139,8 +139,8 @@ class MasterService(val master: ActorRef,
path("submitstormapp") {
post {
uploadFile { form =>
- val jar = form.getFile("jar").map(_.file)
- val configFile = form.getFile("configfile").map(_.file)
+ val jar = form.getFileInfo("jar").map(_.file)
+ val configFile = form.getFileInfo("configfile").map(_.file)
val args = form.getValue("args").getOrElse("")
onComplete(Future(
MasterService.submitStormApp(jar, configFile, args, systemConfig)
@@ -180,12 +180,12 @@ class MasterService(val master: ActorRef,
} ~
path("uploadjar") {
uploadFile { form =>
- val jar = form.getFile("jar").map(_.file)
+ val jar = form.getFileInfo("jar").map(_.file)
if (jar.isEmpty) {
complete(write(
MasterService.Status(success = false, reason = "Jar file not found")))
} else {
- val jarFile = Util.uploadJar(jar.get, jarStore)
+ val jarFile = Util.uploadJar(jar.get, jarStoreClient)
complete(write(jarFile))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
index 7d67f60..d92972b 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
@@ -18,8 +18,6 @@
package org.apache.gearpump.services
-import org.apache.gearpump.jarstore.local.LocalJarStoreService
-
import scala.concurrent.Await
import scala.concurrent.duration._
@@ -31,7 +29,7 @@ import akka.stream.ActorMaterializer
import akka.util.Timeout
import org.apache.commons.lang.exception.ExceptionUtils
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.JarStoreClient
import org.apache.gearpump.util.{Constants, LogUtil}
// NOTE: This cannot be removed!!!
import org.apache.gearpump.services.util.UpickleUtil._
@@ -46,16 +44,7 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem
private val config = system.settings.config
- // only LocalJarStoreService is supported now for "Compose DAG"
- // since DFSJarStoreService requires HDFS to be on the classpath.
- // Note this won't affect users "Submit Gearpump Application" through
- // dashboard with "jarstore.rootpath" set to HDFS.
- if (!JarStoreService.get(config).isInstanceOf[LocalJarStoreService]) {
- LOG.warn("only local jar store is supported for Compose DAG")
- }
- private val jarStoreService = new LocalJarStoreService
- jarStoreService.init(config, system)
-
+ private val jarStoreClient = new JarStoreClient(config, system)
private val securityEnabled = config.getBoolean(
Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED)
@@ -101,9 +90,9 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem
private def services: RouteService = {
val admin = new AdminService(system)
- val masterService = new MasterService(master, jarStoreService, system)
+ val masterService = new MasterService(master, jarStoreClient, system)
val worker = new WorkerService(master, system)
- val app = new AppMasterService(master, jarStoreService, system)
+ val app = new AppMasterService(master, jarStoreClient, system)
val sup = new SupervisorService(master, supervisor, system)
new RouteService {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
index 2ece554..cec7367 100644
--- a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
@@ -36,7 +36,7 @@ import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMaste
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
import org.apache.gearpump.cluster.MasterToClient._
import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
import org.apache.gearpump.util.LogUtil
// NOTE: This cannot be removed!!!
@@ -47,19 +47,13 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
override def testConfig: Config = TestUtil.UI_CONFIG
- private val LOG: Logger = LogUtil.getLogger(getClass)
- private def actorRefFactory = system
-
val mockAppMaster = TestProbe()
val failure = LastFailure(System.currentTimeMillis(), "Some error")
-
- lazy val jarStoreService = JarStoreService.get(system.settings.config)
-
- def jarStore: JarStoreService = jarStoreService
+ val jarStoreClient = new JarStoreClient(system.settings.config, system)
private def master = mockMaster.ref
- private def appMasterRoute = new AppMasterService(master, jarStore, system).route
+ private def appMasterRoute = new AppMasterService(master, jarStoreClient, system).route
mockAppMaster.setAutoPilot {
new AutoPilot {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
index e365e9f..39c0de0 100644
--- a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
@@ -40,7 +40,7 @@ import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersD
import org.apache.gearpump.cluster.MasterToClient._
import org.apache.gearpump.cluster.TestUtil
import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
// NOTE: This cannot be removed!!!
import org.apache.gearpump.services.util.UpickleUtil._
@@ -53,17 +53,13 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest
override def testConfig: Config = TestUtil.UI_CONFIG
- private def actorRefFactory = system
val workerId = 0
val mockWorker = TestProbe()
- lazy val jarStoreService = JarStoreService.get(system.settings.config)
-
+ val jarStoreClient = new JarStoreClient(system.settings.config, system)
private def master = mockMaster.ref
- def jarStore: JarStoreService = jarStoreService
-
- private def masterRoute = new MasterService(master, jarStore, system).route
+ private def masterRoute = new MasterService(master, jarStoreClient, system).route
mockWorker.setAutoPilot {
new AutoPilot {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
index 4658c98..b0e2101 100644
--- a/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
@@ -34,7 +34,6 @@ import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWor
import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig}
import org.apache.gearpump.cluster.TestUtil
import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.jarstore.JarStoreService
// NOTE: This cannot be removed!!!
import org.apache.gearpump.services.util.UpickleUtil._
@@ -49,8 +48,6 @@ class WorkerServiceSpec
protected def master = mockMaster.ref
- lazy val jarStoreService = JarStoreService.get(system.settings.config)
-
protected def workerRoute = new WorkerService(master, system).route
mockWorker.setAutoPilot {
[2/2] incubator-gearpump git commit: fix GEARPUMP-205 remove hdfs
dependency from gear's classpath
Posted by ma...@apache.org.
fix GEARPUMP-205 remove hdfs dependency from gear's classpath
raise the pr to use travis UT
Author: huafengw <fv...@gmail.com>
Closes #81 from huafengw/blob.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/f8f91664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/f8f91664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/f8f91664
Branch: refs/heads/master
Commit: f8f916645cfb9b83767c9d3c3912d04825f38636
Parents: 6852b56
Author: huafengw <fv...@gmail.com>
Authored: Wed Sep 7 12:17:52 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Sep 7 12:17:52 2016 +0800
----------------------------------------------------------------------
.../gearpump/cluster/client/ClientContext.scala | 8 +-
.../gearpump/cluster/main/AppSubmitter.scala | 105 +++++++++++
.../org/apache/gearpump/cluster/main/Gear.scala | 80 +++++++++
.../org/apache/gearpump/cluster/main/Info.scala | 52 ++++++
.../org/apache/gearpump/cluster/main/Kill.scala | 49 ++++++
.../gearpump/cluster/main/MainRunner.scala | 42 +++++
.../apache/gearpump/cluster/main/Replay.scala | 47 +++++
.../gearpump/jarstore/FileDirective.scala | 172 +++++++++++++++++++
.../apache/gearpump/jarstore/FileServer.scala | 160 +++++++++++++++++
.../org/apache/gearpump/jarstore/JarStore.scala | 82 +++++++++
.../gearpump/jarstore/JarStoreClient.scala | 73 ++++++++
.../gearpump/jarstore/JarStoreServer.scala | 52 ++++++
.../gearpump/jarstore/JarStoreService.scala | 86 ----------
.../gearpump/jarstore/local/LocalJarStore.scala | 72 ++++++++
.../scala/org/apache/gearpump/util/Util.scala | 6 +-
.../gearpump/jarstore/FileServerSpec.scala | 129 ++++++++++++++
.../org.apache.gearpump.jarstore.JarStore | 20 +++
...org.apache.gearpump.jarstore.JarStoreService | 20 ---
.../gearpump/cluster/main/AppSubmitter.scala | 106 ------------
.../org/apache/gearpump/cluster/main/Gear.scala | 81 ---------
.../org/apache/gearpump/cluster/main/Info.scala | 53 ------
.../org/apache/gearpump/cluster/main/Kill.scala | 50 ------
.../gearpump/cluster/main/MainRunner.scala | 43 -----
.../apache/gearpump/cluster/main/Replay.scala | 48 ------
.../apache/gearpump/cluster/master/Master.scala | 10 +-
.../apache/gearpump/cluster/worker/Worker.scala | 11 +-
.../gearpump/jarstore/dfs/DFSJarStore.scala | 67 ++++++++
.../jarstore/dfs/DFSJarStoreService.scala | 76 --------
.../gearpump/jarstore/local/LocalJarStore.scala | 64 -------
.../jarstore/local/LocalJarStoreService.scala | 81 ---------
.../apache/gearpump/util/FileDirective.scala | 140 ---------------
.../org/apache/gearpump/util/FileServer.scala | 167 ------------------
.../org.apache.gearpump.jarstore.JarStore | 20 +++
...org.apache.gearpump.jarstore.JarStoreService | 20 ---
.../apache/gearpump/util/FileServerSpec.scala | 120 -------------
project/Build.scala | 9 +-
project/Pack.scala | 5 +-
.../gearpump/services/AppMasterService.scala | 34 ++--
.../gearpump/services/MasterService.scala | 18 +-
.../apache/gearpump/services/RestServices.scala | 19 +-
.../services/AppMasterServiceSpec.scala | 12 +-
.../gearpump/services/MasterServiceSpec.scala | 10 +-
.../gearpump/services/WorkerServiceSpec.scala | 3 -
43 files changed, 1279 insertions(+), 1243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
index 245f1bc..0cba079 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
@@ -33,7 +33,7 @@ import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFrom
import org.apache.gearpump.cluster.MasterToClient.ReplayApplicationResult
import org.apache.gearpump.cluster._
import org.apache.gearpump.cluster.master.MasterProxy
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
import org.apache.gearpump.util.Constants._
import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
@@ -59,8 +59,7 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
LOG.info(s"Starting system ${system.name}")
val shouldCleanupSystem = Option(sys).isEmpty
- private val jarStoreService = JarStoreService.get(config)
- jarStoreService.init(config, system)
+ private val jarStoreClient = new JarStoreClient(config, system)
private lazy val master: ActorRef = {
val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
@@ -140,8 +139,7 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
private def loadFile(jarPath: String): AppJar = {
val jarFile = new java.io.File(jarPath)
- val path = jarStoreService.copyFromLocal(jarFile)
- AppJar(jarFile.getName, path)
+ Util.uploadJar(jarFile, jarStoreClient)
}
private def checkAndAddNamePrefix(appName: String, namePrefix: String): String = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
new file mode 100644
index 0000000..b2eef7d
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import java.io.File
+import java.net.{URL, URLClassLoader}
+import java.util.jar.JarFile
+
+import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
+import org.slf4j.Logger
+
+/** Tool to submit an application jar to cluster */
+object AppSubmitter extends AkkaApp with ArgumentsParser {
+ val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val ignoreUnknownArgument = true
+
+ override val description = "Submit an application to Master by providing a jar"
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "namePrefix" -> CLIOption[String]("<application name prefix>", required = false,
+ defaultValue = Some("")),
+ "jar" -> CLIOption("<application>.jar", required = true),
+ "executors" -> CLIOption[Int]("number of executor to launch", required = false,
+ defaultValue = Some(1)),
+ "verbose" -> CLIOption("<print verbose log on console>", required = false,
+ defaultValue = Some(false)),
+ // For document purpose only, OPTION_CONFIG option is not used here.
+ // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+
+ val config = parse(args)
+ if (null != config) {
+
+ val verbose = config.getBoolean("verbose")
+ if (verbose) {
+ LogUtil.verboseLogToConsole()
+ }
+
+ val jar = config.getString("jar")
+
+ // Set jar path to be submitted to cluster
+ System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
+ System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString)
+
+ val namePrefix = config.getString("namePrefix")
+ if (namePrefix.nonEmpty) {
+ if (!Util.validApplicationName(namePrefix)) {
+ throw new Exception(s"$namePrefix is not a valid prefix for an application name")
+ }
+ System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix)
+ }
+
+ val jarFile = new java.io.File(jar)
+
+ // Start main class
+ if (!jarFile.exists()) {
+ throw new Exception(s"jar $jar does not exist")
+ }
+
+ val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" +
+ jarFile.getAbsolutePath)), Thread.currentThread().getContextClassLoader)
+ val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader)
+
+ // Set to context classLoader. ActorSystem pick context classLoader in preference
+ Thread.currentThread().setContextClassLoader(classLoader)
+ val clazz = classLoader.loadClass(main)
+ val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+ mainMethod.invoke(null, arguments)
+ }
+ }
+
+ private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader)
+ : (String, Array[String]) = {
+ val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes.
+ getValue("Main-Class")).getOrElse("")
+
+ if (remainArgs.length > 0) {
+ classLoader.loadClass(remainArgs(0))
+ (remainArgs(0), remainArgs.drop(1))
+ } else if (mainInManifest.nonEmpty) {
+ (mainInManifest, remainArgs)
+ } else {
+ throw new Exception("No main class specified")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
new file mode 100644
index 0000000..1511469
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import org.apache.gearpump.util.{Constants, LogUtil}
+import org.slf4j.Logger
+
+object Gear {
+
+ val OPTION_CONFIG = "conf"
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ val commands = Map("app" -> AppSubmitter, "kill" -> Kill,
+ "info" -> Info, "replay" -> Replay, "main" -> MainRunner)
+
+ def usage(): Unit = {
+ val keys = commands.keys.toList.sorted
+ // scalastyle:off println
+ Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+ // scalastyle:on println
+ }
+
+ private def executeCommand(command: String, commandArgs: Array[String]) = {
+ commands.get(command).map(_.main(commandArgs))
+ if (!commands.contains(command)) {
+ val allArgs = (command +: commandArgs.toList).toArray
+ MainRunner.main(allArgs)
+ }
+ }
+
+ def main(inputArgs: Array[String]): Unit = {
+ val (configFile, args) = extractConfig(inputArgs)
+ if (configFile != null) {
+ // Sets custom config file...
+ System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile)
+ }
+
+ if (args.length == 0) {
+ usage()
+ } else {
+ val command = args(0)
+ val commandArgs = args.drop(1)
+ executeCommand(command, commandArgs)
+ }
+ }
+
+ private def extractConfig(inputArgs: Array[String]): (String, Array[String]) = {
+ var index = 0
+
+ var result = List.empty[String]
+ var configFile: String = null
+ while (index < inputArgs.length) {
+ val item = inputArgs(index)
+ if (item == s"-$OPTION_CONFIG") {
+ index += 1
+ configFile = inputArgs(index)
+ } else {
+ result = result :+ item
+ }
+ index += 1
+ }
+ (configFile, result.toArray)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
new file mode 100644
index 0000000..e1fe291
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMastersData
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+import org.slf4j.Logger
+
+/** Tool to query master info */
+object Info extends AkkaApp with ArgumentsParser {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ // For document purpose only, OPTION_CONFIG option is not used here.
+ // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
+
+ override val description = "Query the Application list"
+
+ // scalastyle:off println
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+ val client = ClientContext(akkaConf)
+
+ val AppMastersData(appMasters) = client.listApps
+ Console.println("== Application Information ==")
+ Console.println("====================================")
+ appMasters.foreach { appData =>
+ Console.println(s"application: ${appData.appId}, name: ${appData.appName}, " +
+ s"status: ${appData.status}, worker: ${appData.workerPath}")
+ }
+ client.close()
+ }
+ // scalastyle:on println
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
new file mode 100644
index 0000000..8ecaf85
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+import org.slf4j.Logger
+
+/** Tool to kill an App */
+object Kill extends AkkaApp with ArgumentsParser {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "appid" -> CLIOption("<application id>", required = true),
+ // For document purpose only, OPTION_CONFIG option is not used here.
+ // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
+
+ override val description = "Kill an application with application Id"
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+
+ if (null != config) {
+ val client = ClientContext(akkaConf)
+ LOG.info("Client ")
+ client.shutdown(config.getInt("appid"))
+ client.close()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
new file mode 100644
index 0000000..8664232
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+import org.slf4j.Logger
+
+/** Tool to run any main class by providing a jar */
+object MainRunner extends AkkaApp with ArgumentsParser {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ // For document purpose only, OPTION_CONFIG option is not used here.
+ // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+ val mainClazz = args(0)
+ val commandArgs = args.drop(1)
+
+ val clazz = Thread.currentThread().getContextClassLoader().loadClass(mainClazz)
+ val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+ mainMethod.invoke(null, commandArgs)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
new file mode 100644
index 0000000..e648d61
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+import org.slf4j.Logger
+
+// Internal tool to restart an application
+object Replay extends AkkaApp with ArgumentsParser {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "appid" -> CLIOption("<application id>", required = true),
+ // For document purpose only, OPTION_CONFIG option is not used here.
+ // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
+
+ override val description = "Replay the application from current min clock(low watermark)"
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+
+ if (null != config) {
+ val client = ClientContext(akkaConf)
+ client.replayFromTimestampWindowTrailingEdge(config.getInt("appid"))
+ client.close()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala b/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala
new file mode 100644
index 0000000..969da04
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.jarstore
+
+import java.io.File
+import java.time.Instant
+import scala.concurrent.{ExecutionContext, Future}
+
+import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server._
+import akka.stream.Materializer
+import akka.stream.scaladsl.{StreamConverters, FileIO}
+import akka.util.ByteString
+
+
+/**
+ * FileDirective is a set of Akka-http directive to upload/download
+ * huge binary files to/from Akka-Http server.
+ */
+object FileDirective {
+
+ // Form field name
+ type Name = String
+
+ val CHUNK_SIZE = 262144
+
+ /**
+ * File information after a file is uploaded to server.
+ *
+ * @param originFileName original file name when user upload it in browser.
+ * @param file file name after the file is saved to server.
+ * @param length the length of the file
+ */
+ case class FileInfo(originFileName: String, file: File, length: Long)
+
+ class Form(val fields: Map[Name, FormField]) {
+ def getFileInfo(fieldName: String): Option[FileInfo] = {
+ fields.get(fieldName).flatMap {
+ case Left(file) => Option(file)
+ case Right(_) => None
+ }
+ }
+
+ def getValue(fieldName: String): Option[String] = {
+ fields.get(fieldName).flatMap {
+ case Left(_) => None
+ case Right(value) => Option(value)
+ }
+ }
+ }
+
+ type FormField = Either[FileInfo, String]
+
+ /**
+ * Store the uploaded files to temporary directory, and return a Map from form field name
+ * to FileInfo.
+ */
+ def uploadFile: Directive1[Form] = {
+ Directive[Tuple1[Form]] { inner =>
+ extractMaterializer {implicit mat =>
+ extractExecutionContext {implicit ec =>
+ uploadFileImpl(mat, ec) { formFuture =>
+ ctx => {
+ formFuture.map(form => inner(Tuple1(form))).flatMap(route => route(ctx))
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Store the uploaded files to JarStore, and return a Map from form field name
+ * to FilePath in JatStore.
+ */
+ def uploadFileTo(jarStore: JarStore): Directive1[Map[Name, FilePath]] = {
+ Directive[Tuple1[Map[Name, FilePath]]] { inner =>
+ extractMaterializer {implicit mat =>
+ extractExecutionContext {implicit ec =>
+ uploadFileImpl(jarStore)(mat, ec) { filesFuture =>
+ ctx => {
+ filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx))
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // Downloads file from server
+ def downloadFileFrom(jarStore: JarStore, filePath: String): Route = {
+ val responseEntity = HttpEntity(
+ MediaTypes.`application/octet-stream`,
+ StreamConverters.fromInputStream(
+ () => jarStore.getFile(filePath), CHUNK_SIZE
+ ))
+ complete(responseEntity)
+ }
+
+ private def uploadFileImpl(jarStore: JarStore)
+ (implicit mat: Materializer, ec: ExecutionContext): Directive1[Future[Map[Name, FilePath]]] = {
+ Directive[Tuple1[Future[Map[Name, FilePath]]]] { inner =>
+ entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
+ val fileNameMap = formdata.parts.mapAsync(1) { p =>
+ if (p.filename.isDefined) {
+ val path = Instant.now().toEpochMilli + p.filename.get
+ val sink = StreamConverters.fromOutputStream(() => jarStore.createFile(path),
+ autoFlush = true)
+ p.entity.dataBytes.runWith(sink).map(written =>
+ if (written.count > 0) {
+ Map(p.name -> FilePath(path))
+ } else {
+ Map.empty[Name, FilePath]
+ })
+ } else {
+ Future(Map.empty[Name, FilePath])
+ }
+ }.runFold(Map.empty[Name, FilePath])((set, value) => set ++ value)
+ inner(Tuple1(fileNameMap))
+ }
+ }
+ }
+
+ private def uploadFileImpl(implicit mat: Materializer, ec: ExecutionContext)
+ : Directive1[Future[Form]] = {
+ Directive[Tuple1[Future[Form]]] { inner =>
+ entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
+ val form = formdata.parts.mapAsync(1) { p =>
+ if (p.filename.isDefined) {
+ val targetPath = File.createTempFile(s"userfile_${p.name}_",
+ s"${p.filename.getOrElse("")}")
+ val writtenFuture = p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
+ writtenFuture.map(written =>
+ if (written.count > 0) {
+ Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count)))
+ } else {
+ Map.empty[Name, FormField]
+ })
+ } else {
+ val valueFuture = p.entity.dataBytes.runFold(ByteString.empty) {(total, input) =>
+ total ++ input
+ }
+ valueFuture.map{value =>
+ Map(p.name -> Right(value.utf8String))
+ }
+ }
+ }.runFold(new Form(Map.empty[Name, FormField])) {(set, value) =>
+ new Form(set.fields ++ value)
+ }
+
+ inner(Tuple1(form))
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
new file mode 100644
index 0000000..4ce8f2d
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore
+
+import java.io.File
+import scala.concurrent.{ExecutionContext, Future}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.Http.ServerBinding
+import akka.http.scaladsl.marshalling.Marshal
+import akka.http.scaladsl.model.Uri.{Path, Query}
+import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server._
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{FileIO, Sink, Source}
+import spray.json.DefaultJsonProtocol._
+import spray.json.JsonFormat
+
+import org.apache.gearpump.jarstore.FileDirective._
+import org.apache.gearpump.jarstore.FileServer.Port
+
+/**
+ * A simple file server implemented with akka-http to store/fetch large
+ * binary files.
+ */
+class FileServer(system: ActorSystem, host: String, port: Int = 0, jarStore: JarStore) {
+ import system.dispatcher
+ implicit val actorSystem = system
+ implicit val materializer = ActorMaterializer()
+ implicit def ec: ExecutionContext = system.dispatcher
+
+ val route: Route = {
+ path("upload") {
+ uploadFileTo(jarStore) { form =>
+ val uploadedFilePath = form.headOption.map(_._2)
+
+ if (uploadedFilePath.isDefined) {
+ complete(uploadedFilePath.get.path)
+ } else {
+ failWith(new Exception("File not found in the uploaded form"))
+ }
+ }
+ } ~
+ path("download") {
+ parameters("file") { file: String =>
+ downloadFileFrom(jarStore, file)
+ }
+ } ~
+ pathEndOrSingleSlash {
+ extractUri { uri =>
+ val upload = uri.withPath(Uri.Path("/upload")).toString()
+ val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`,
+ s"""
+ |
+ |<h2>Please specify a file to upload:</h2>
+ |<form action="$upload" enctype="multipart/form-data" method="post">
+ |<input type="file" name="datafile" size="40">
+ |</p>
+ |<div>
+ |<input type="submit" value="Submit">
+ |</div>
+ |</form>
+ """.stripMargin)
+ complete(entity)
+ }
+ }
+ }
+
+ private var connection: Future[ServerBinding] = null
+
+ def start: Future[Port] = {
+ connection = Http().bindAndHandle(Route.handlerFlow(route), host, port)
+ connection.map(address => Port(address.localAddress.getPort))
+ }
+
+ def stop: Future[Unit] = {
+ connection.flatMap(_.unbind())
+ }
+}
+
+object FileServer {
+
+ implicit def filePathFormat: JsonFormat[FilePath] = jsonFormat1(FilePath.apply)
+
+ case class Port(port: Int)
+
+ /**
+ * Client of [[org.apache.gearpump.jarstore.FileServer]]
+ */
+ class Client(system: ActorSystem, host: String, port: Int) {
+
+ def this(system: ActorSystem, url: String) = {
+ this(system, Uri(url).authority.host.address(), Uri(url).authority.port)
+ }
+
+ private implicit val actorSystem = system
+ private implicit val materializer = ActorMaterializer()
+ private implicit val ec = system.dispatcher
+
+ val server = Uri(s"http://$host:$port")
+ val httpClient = Http(system).outgoingConnection(server.authority.host.address(),
+ server.authority.port)
+
+ def upload(file: File): Future[FilePath] = {
+ val target = server.withPath(Path("/upload"))
+
+ val request = entity(file).map { entity =>
+ HttpRequest(HttpMethods.POST, uri = target, entity = entity)
+ }
+
+ val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head)
+ response.flatMap { some =>
+ Unmarshal(some).to[String]
+ }.map { path =>
+ FilePath(path)
+ }
+ }
+
+ def download(remoteFile: FilePath, saveAs: File): Future[Unit] = {
+ val download = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path))
+ // Download file to local
+ val response = Source.single(HttpRequest(uri = download)).via(httpClient).runWith(Sink.head)
+ val downloaded = response.flatMap { response =>
+ response.entity.dataBytes.runWith(FileIO.toFile(saveAs))
+ }
+ downloaded.map(written => Unit)
+ }
+
+ private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
+ val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
+ FileIO.fromFile(file, chunkSize = 100000))
+ val body = Source.single(
+ Multipart.FormData.BodyPart(
+ "uploadfile",
+ entity,
+ Map("filename" -> file.getName)))
+ val form = Multipart.FormData(body)
+
+ Marshal(form).to[RequestEntity]
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/JarStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStore.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStore.scala
new file mode 100644
index 0000000..a4a411f
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/JarStore.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.jarstore
+
+import java.io.{InputStream, OutputStream}
+import java.net.URI
+import java.util.ServiceLoader
+
+import com.typesafe.config.Config
+import org.apache.gearpump.util.Util
+
+import scala.collection.JavaConverters._
+
+case class FilePath(path: String)
+
+/**
+ * JarStore is used to manage the upload/download of binary files,
+ * like user submitted application jar.
+ */
+trait JarStore {
+ /**
+ * The scheme of the JarStore.
+ * Like "hdfs" for HDFS file system, and "file" for a local
+ * file system.
+ */
+ val scheme: String
+
+ /**
+ * Init the Jar Store.
+ */
+ def init(config: Config)
+
+ /**
+ * Creates the file on JarStore.
+ *
+ * @param fileName name of the file to be created on JarStore.
+ * @return OutputStream returns a stream into which the data can be written.
+ */
+ def createFile(fileName: String): OutputStream
+
+ /**
+ * Gets the InputStream to read the file
+ *
+ * @param fileName name of the file to be read on JarStore.
+ * @return InputStream returns a stream from which the data can be read.
+ */
+ def getFile(fileName: String): InputStream
+}
+
+object JarStore {
+
+ /**
+ * Get a active JarStore by specifying a scheme.
+ *
+ * Please see config [[org.apache.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for
+ * more information.
+ */
+ private lazy val jarstores: List[JarStore] = {
+ ServiceLoader.load(classOf[JarStore]).asScala.toList
+ }
+
+ def get(rootPath: String): JarStore = {
+ val scheme = new URI(Util.resolvePath(rootPath)).getScheme
+ jarstores.find(_.scheme == scheme).get
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreClient.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreClient.scala
new file mode 100644
index 0000000..59cc405
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreClient.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.Await
+
+import akka.pattern.ask
+import akka.actor.{ActorSystem, ActorRef}
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.util.{Util, Constants, LogUtil}
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
+import scala.concurrent.{Future, ExecutionContext}
+
+class JarStoreClient(config: Config, system: ActorSystem) {
+ private def LOG: Logger = LogUtil.getLogger(getClass)
+ private implicit val timeout = Constants.FUTURE_TIMEOUT
+ private implicit def dispatcher: ExecutionContext = system.dispatcher
+
+ private val master: ActorRef = {
+ val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
+ .asScala.flatMap(Util.parseHostList)
+ system.actorOf(MasterProxy.props(masters), s"masterproxy${Util.randInt()}")
+ }
+
+ private lazy val client = (master ? GetJarStoreServer).asInstanceOf[Future[JarStoreServerAddress]]
+ .map { address =>
+ val client = new FileServer.Client(system, address.url)
+ client
+ }
+
+ /**
+ * This function will copy the remote file to local file system, called from client side.
+ *
+ * @param localFile The destination of file path
+ * @param remotePath The remote file path from JarStore
+ */
+ def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
+ LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from $remotePath")
+ val future = client.flatMap(_.download(remotePath, localFile))
+ Await.ready(future, Duration(60, TimeUnit.SECONDS))
+ }
+
+ /**
+ * This function will copy the local file to the remote JarStore, called from client side.
+ * @param localFile The local file
+ */
+ def copyFromLocal(localFile: File): FilePath = {
+ val future = client.flatMap(_.upload(localFile))
+ Await.result(future, Duration(60, TimeUnit.SECONDS))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreServer.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreServer.scala
new file mode 100644
index 0000000..1fb0de5
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreServer.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore
+
+import akka.actor.{Actor, Stash}
+import akka.pattern.pipe
+
+import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
+import org.apache.gearpump.util._
+
+class JarStoreServer(jarStoreRootPath: String) extends Actor with Stash {
+ private val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
+ private val jarStore = JarStore.get(jarStoreRootPath)
+ jarStore.init(context.system.settings.config)
+ private val server = new FileServer(context.system, host, 0, jarStore)
+ implicit val timeout = Constants.FUTURE_TIMEOUT
+ implicit val executionContext = context.dispatcher
+
+ server.start pipeTo self
+
+ def receive: Receive = {
+ case FileServer.Port(port) =>
+ context.become(listen(port))
+ unstashAll()
+ case _ =>
+ stash()
+ }
+
+ def listen(port: Int): Receive = {
+ case GetJarStoreServer =>
+ sender ! JarStoreServerAddress(s"http://$host:$port/")
+ }
+
+ override def postStop(): Unit = {
+ server.stop
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala
deleted file mode 100644
index 0ba9558..0000000
--- a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.jarstore
-
-import java.io.File
-import java.net.URI
-import java.util.ServiceLoader
-import scala.collection.JavaConverters._
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-
-import org.apache.gearpump.util.{Constants, Util}
-
-case class FilePath(path: String)
-
-/**
- * JarStoreService is used to manage the upload/download of binary files,
- * like user submitted application jar.
- */
-trait JarStoreService {
- /**
- * The scheme of the JarStoreService.
- * Like "hdfs" for HDFS file system, and "file" for a local
- * file system.
- */
- val scheme: String
-
- /**
- * Init the Jar Store.
- */
- def init(config: Config, system: ActorSystem)
-
- /**
- * This function will copy the local file to the remote JarStore, called from client side.
- * @param localFile The local file
- */
- def copyFromLocal(localFile: File): FilePath
-
- /**
- * This function will copy the remote file to local file system, called from client side.
- *
- * @param localFile The destination of file path
- * @param remotePath The remote file path from JarStore
- */
- def copyToLocalFile(localFile: File, remotePath: FilePath)
-}
-
-object JarStoreService {
-
- /**
- * Get a active JarStoreService by specifying a scheme.
- *
- * Please see config [[org.apache.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for
- * more information.
- */
- def get(config: Config): JarStoreService = {
- val jarStoreRootPath = config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
- get(jarStoreRootPath)
- }
-
- private lazy val jarstoreServices: List[JarStoreService] = {
- ServiceLoader.load(classOf[JarStoreService]).asScala.toList
- }
-
- private def get(rootPath: String): JarStoreService = {
- val scheme = new URI(Util.resolvePath(rootPath)).getScheme
- jarstoreServices.find(_.scheme == scheme).get
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala b/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
new file mode 100644
index 0000000..c15a9be
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore.local
+
+import java.io._
+
+import com.typesafe.config.Config
+import org.apache.gearpump.jarstore.JarStore
+import org.apache.gearpump.util.{LogUtil, FileUtils, Constants}
+import org.slf4j.Logger
+
+/**
+ * LocalJarStore store the uploaded jar on local disk.
+ */
+class LocalJarStore extends JarStore {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+ private var rootPath: String = null
+ override val scheme: String = "file"
+
+ class ClosedInputStream extends InputStream {
+ override def read(): Int = -1
+ }
+
+ override def init(config: Config): Unit = {
+ rootPath = config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
+ FileUtils.forceMkdir(new File(rootPath))
+ }
+
+ /**
+ * Creates the file on JarStore.
+ *
+ * @param fileName name of the file to be created on JarStore.
+ * @return OutputStream returns a stream into which the data can be written.
+ */
+ override def createFile(fileName: String): OutputStream = {
+ val localFile = new File(rootPath, fileName)
+ new FileOutputStream(localFile)
+ }
+
+ /**
+ * Gets the InputStream to read the file
+ *
+ * @param fileName name of the file to be read on JarStore.
+ * @return InputStream returns a stream from which the data can be read.
+ */
+ override def getFile(fileName: String): InputStream = {
+ val localFile = new File(rootPath, fileName)
+ val is = try {
+ new FileInputStream(localFile)
+ } catch {
+ case ex: Exception =>
+ LOG.error(s"Fetch file $fileName failed: ${ex.getStackTrace}")
+ new ClosedInputStream
+ }
+ is
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/main/scala/org/apache/gearpump/util/Util.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Util.scala b/core/src/main/scala/org/apache/gearpump/util/Util.scala
index 19bd5a8..0faa46a 100644
--- a/core/src/main/scala/org/apache/gearpump/util/Util.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/Util.scala
@@ -27,7 +27,7 @@ import scala.util.{Failure, Success, Try}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.gearpump.cluster.AppJar
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
import org.apache.gearpump.transport.HostPort
object Util {
@@ -123,8 +123,8 @@ object Util {
}
}
- def uploadJar(jarFile: File, jarStoreService: JarStoreService): AppJar = {
- val remotePath = jarStoreService.copyFromLocal(jarFile)
+ def uploadJar(jarFile: File, jarStoreClient: JarStoreClient): AppJar = {
+ val remotePath = jarStoreClient.copyFromLocal(jarFile)
AppJar(jarFile.getName, remotePath)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala b/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
new file mode 100644
index 0000000..c99a031
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.jarstore
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+
+import akka.actor.ActorSystem
+import com.typesafe.config.{ConfigValueFactory, ConfigValue}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.google.common.io.Files
+import org.apache.gearpump.jarstore.local.LocalJarStore
+import org.apache.gearpump.util.{FileUtils, LogUtil}
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import org.apache.gearpump.jarstore.FileServer._
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
+
+ implicit val timeout = akka.util.Timeout(25, TimeUnit.SECONDS)
+ val host = "localhost"
+ private val LOG = LogUtil.getLogger(getClass)
+
+ var system: ActorSystem = null
+
+ override def afterAll {
+ if (null != system) {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+ }
+
+ override def beforeAll {
+ system = ActorSystem("FileServerSpec", TestUtil.DEFAULT_CONFIG)
+ }
+
+ private def save(client: Client, data: Array[Byte]): FilePath = {
+ val file = File.createTempFile("fileserverspec", "test")
+ FileUtils.writeByteArrayToFile(file, data)
+ val future = client.upload(file)
+ import scala.concurrent.duration._
+ val path = Await.result(future, 30.seconds)
+ file.delete()
+ path
+ }
+
+ private def get(client: Client, remote: FilePath): Array[Byte] = {
+ val file = File.createTempFile("fileserverspec", "test")
+ val future = client.download(remote, file)
+ import scala.concurrent.duration._
+ val data = Await.result(future, 10.seconds)
+
+ val bytes = FileUtils.readFileToByteArray(file)
+ file.delete()
+ bytes
+ }
+
+ "The file server" should {
+ "serve the data previously stored" in {
+
+ val rootDir = Files.createTempDir()
+ val localJarStore: JarStore = new LocalJarStore
+ val conf = TestUtil.DEFAULT_CONFIG.withValue("gearpump.jarstore.rootpath",
+ ConfigValueFactory.fromAnyRef(rootDir.getAbsolutePath))
+ localJarStore.init(conf)
+
+ val server = new FileServer(system, host, 0, localJarStore)
+ val port = Await.result(server.start, Duration(25, TimeUnit.SECONDS))
+
+ LOG.info("start test web server on port " + port)
+
+ val sizes = List(1, 100, 1000000, 50000000)
+ val client = new Client(system, host, port.port)
+
+ sizes.foreach { size =>
+ val bytes = randomBytes(size)
+ val url = s"http://$host:${port.port}/$size"
+ val remote = save(client, bytes)
+ val fetchedBytes = get(client, remote)
+ assert(fetchedBytes sameElements bytes, s"fetch data is coruppted, $url, $rootDir")
+ }
+ server.stop
+ rootDir.delete()
+ }
+ }
+
+ "The file server" should {
+ "handle missed file" in {
+
+ val rootDir = Files.createTempDir()
+ val localJarStore: JarStore = new LocalJarStore
+ val conf = TestUtil.DEFAULT_CONFIG.withValue("gearpump.jarstore.rootpath",
+ ConfigValueFactory.fromAnyRef(rootDir.getAbsolutePath))
+ localJarStore.init(conf)
+
+ val server = new FileServer(system, host, 0, localJarStore)
+ val port = Await.result(server.start, Duration(25, TimeUnit.SECONDS))
+
+ val client = new Client(system, host, port.port)
+ val fetchedBytes = get(client, FilePath("noexist"))
+ assert(fetchedBytes.length == 0)
+ rootDir.delete()
+ }
+ }
+
+ private def randomBytes(size: Int): Array[Byte] = {
+ val bytes = new Array[Byte](size)
+ new java.util.Random().nextBytes(bytes)
+ bytes
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
----------------------------------------------------------------------
diff --git a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
new file mode 100644
index 0000000..e173a8a
--- /dev/null
+++ b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.gearpump.jarstore.local.LocalJarStore
+org.apache.gearpump.jarstore.dfs.DFSJarStore
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
----------------------------------------------------------------------
diff --git a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
deleted file mode 100644
index bf37316..0000000
--- a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.gearpump.jarstore.local.LocalJarStoreService
-org.apache.gearpump.jarstore.dfs.DFSJarStoreService
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
deleted file mode 100644
index d0de51a..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.main
-
-import java.io.File
-import java.net.{URL, URLClassLoader}
-import java.util.jar.JarFile
-
-import org.slf4j.Logger
-
-import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
-
-/** Tool to submit an application jar to cluster */
-object AppSubmitter extends AkkaApp with ArgumentsParser {
- val LOG: Logger = LogUtil.getLogger(getClass)
-
- override val ignoreUnknownArgument = true
-
- override val description = "Submit an application to Master by providing a jar"
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- "namePrefix" -> CLIOption[String]("<application name prefix>", required = false,
- defaultValue = Some("")),
- "jar" -> CLIOption("<application>.jar", required = true),
- "executors" -> CLIOption[Int]("number of executor to launch", required = false,
- defaultValue = Some(1)),
- "verbose" -> CLIOption("<print verbose log on console>", required = false,
- defaultValue = Some(false)),
- // For document purpose only, OPTION_CONFIG option is not used here.
- // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
- Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
- defaultValue = None))
-
- def main(akkaConf: Config, args: Array[String]): Unit = {
-
- val config = parse(args)
- if (null != config) {
-
- val verbose = config.getBoolean("verbose")
- if (verbose) {
- LogUtil.verboseLogToConsole()
- }
-
- val jar = config.getString("jar")
-
- // Set jar path to be submitted to cluster
- System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
- System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString)
-
- val namePrefix = config.getString("namePrefix")
- if (namePrefix.nonEmpty) {
- if (!Util.validApplicationName(namePrefix)) {
- throw new Exception(s"$namePrefix is not a valid prefix for an application name")
- }
- System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix)
- }
-
- val jarFile = new java.io.File(jar)
-
- // Start main class
- if (!jarFile.exists()) {
- throw new Exception(s"jar $jar does not exist")
- }
-
- val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" +
- jarFile.getAbsolutePath)), Thread.currentThread().getContextClassLoader)
- val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader)
-
- // Set to context classLoader. ActorSystem pick context classLoader in preference
- Thread.currentThread().setContextClassLoader(classLoader)
- val clazz = classLoader.loadClass(main)
- val mainMethod = clazz.getMethod("main", classOf[Array[String]])
- mainMethod.invoke(null, arguments)
- }
- }
-
- private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader)
- : (String, Array[String]) = {
- val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes.
- getValue("Main-Class")).getOrElse("")
-
- if (remainArgs.length > 0) {
- classLoader.loadClass(remainArgs(0))
- (remainArgs(0), remainArgs.drop(1))
- } else if (mainInManifest.nonEmpty) {
- (mainInManifest, remainArgs)
- } else {
- throw new Exception("No main class specified")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
deleted file mode 100644
index 672fee6..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import org.apache.gearpump.util.{Constants, LogUtil}
-
-object Gear {
-
- val OPTION_CONFIG = "conf"
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- val commands = Map("app" -> AppSubmitter, "kill" -> Kill,
- "info" -> Info, "replay" -> Replay, "main" -> MainRunner)
-
- def usage(): Unit = {
- val keys = commands.keys.toList.sorted
- // scalastyle:off println
- Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
- // scalastyle:on println
- }
-
- private def executeCommand(command: String, commandArgs: Array[String]) = {
- commands.get(command).map(_.main(commandArgs))
- if (!commands.contains(command)) {
- val allArgs = (command +: commandArgs.toList).toArray
- MainRunner.main(allArgs)
- }
- }
-
- def main(inputArgs: Array[String]): Unit = {
- val (configFile, args) = extractConfig(inputArgs)
- if (configFile != null) {
- // Sets custom config file...
- System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile)
- }
-
- if (args.length == 0) {
- usage()
- } else {
- val command = args(0)
- val commandArgs = args.drop(1)
- executeCommand(command, commandArgs)
- }
- }
-
- private def extractConfig(inputArgs: Array[String]): (String, Array[String]) = {
- var index = 0
-
- var result = List.empty[String]
- var configFile: String = null
- while (index < inputArgs.length) {
- val item = inputArgs(index)
- if (item == s"-$OPTION_CONFIG") {
- index += 1
- configFile = inputArgs(index)
- } else {
- result = result :+ item
- }
- index += 1
- }
- (configFile, result.toArray)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
deleted file mode 100644
index bf444a3..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.MasterToAppMaster.AppMastersData
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to query master info */
-object Info extends AkkaApp with ArgumentsParser {
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- // For document purpose only, OPTION_CONFIG option is not used here.
- // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
- Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
- defaultValue = None))
-
- override val description = "Query the Application list"
-
- // scalastyle:off println
- def main(akkaConf: Config, args: Array[String]): Unit = {
- val client = ClientContext(akkaConf)
-
- val AppMastersData(appMasters) = client.listApps
- Console.println("== Application Information ==")
- Console.println("====================================")
- appMasters.foreach { appData =>
- Console.println(s"application: ${appData.appId}, name: ${appData.appName}, " +
- s"status: ${appData.status}, worker: ${appData.workerPath}")
- }
- client.close()
- }
- // scalastyle:on println
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
deleted file mode 100644
index 17f6214..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to kill an App */
-object Kill extends AkkaApp with ArgumentsParser {
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- "appid" -> CLIOption("<application id>", required = true),
- // For document purpose only, OPTION_CONFIG option is not used here.
- // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
- Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
- defaultValue = None))
-
- override val description = "Kill an application with application Id"
-
- def main(akkaConf: Config, args: Array[String]): Unit = {
- val config = parse(args)
-
- if (null != config) {
- val client = ClientContext(akkaConf)
- LOG.info("Client ")
- client.shutdown(config.getInt("appid"))
- client.close()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
deleted file mode 100644
index c6c9f10..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to run any main class by providing a jar */
-object MainRunner extends AkkaApp with ArgumentsParser {
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- // For document purpose only, OPTION_CONFIG option is not used here.
- // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
- Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
- defaultValue = None))
-
- def main(akkaConf: Config, args: Array[String]): Unit = {
- val mainClazz = args(0)
- val commandArgs = args.drop(1)
-
- val clazz = Thread.currentThread().getContextClassLoader().loadClass(mainClazz)
- val mainMethod = clazz.getMethod("main", classOf[Array[String]])
- mainMethod.invoke(null, commandArgs)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
deleted file mode 100644
index d721832..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.main
-
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-
-// Internal tool to restart an application
-object Replay extends AkkaApp with ArgumentsParser {
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- "appid" -> CLIOption("<application id>", required = true),
- // For document purpose only, OPTION_CONFIG option is not used here.
- // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
- Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
- defaultValue = None))
-
- override val description = "Replay the application from current min clock(low watermark)"
-
- def main(akkaConf: Config, args: Array[String]): Unit = {
- val config = parse(args)
-
- if (null != config) {
- val client = ClientContext(akkaConf)
- client.replayFromTimestampWindowTrailingEdge(config.getInt("appid"))
- client.close()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
index 762cf27..6b4df07 100644
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
@@ -20,6 +20,7 @@ package org.apache.gearpump.cluster.master
import java.lang.management.ManagementFactory
import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.jarstore.JarStoreServer
import scala.collection.JavaConverters._
import scala.collection.immutable
@@ -40,7 +41,6 @@ import org.apache.gearpump.cluster.WorkerToMaster._
import org.apache.gearpump.cluster.master.InMemoryKVService._
import org.apache.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _}
import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-import org.apache.gearpump.jarstore.local.LocalJarStore
import org.apache.gearpump.metrics.Metrics.ReportMetrics
import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
import org.apache.gearpump.transport.HostPort
@@ -79,11 +79,7 @@ private[cluster] class Master extends Actor with Stash {
val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
- private val jarStore = if (Util.isLocalPath(jarStoreRootPath)) {
- Some(context.actorOf(Props(classOf[LocalJarStore], jarStoreRootPath)))
- } else {
- None
- }
+ private val jarStore = context.actorOf(Props(classOf[JarStoreServer], jarStoreRootPath))
private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort)
@@ -162,7 +158,7 @@ private[cluster] class Master extends Actor with Stash {
def jarStoreService: Receive = {
case GetJarStoreServer =>
- jarStore.foreach(_ forward GetJarStoreServer)
+ jarStore forward GetJarStoreServer
}
def kvServiceMsgHandler: Receive = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
index ff74368..1b52e5d 100644
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
@@ -43,7 +43,7 @@ import org.apache.gearpump.cluster.WorkerToMaster._
import org.apache.gearpump.cluster.master.Master.MasterInfo
import org.apache.gearpump.cluster.scheduler.Resource
import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
-import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
import org.apache.gearpump.metrics.Metrics.ReportMetrics
import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
import org.apache.gearpump.util.ActorSystemBooter.Daemon
@@ -69,8 +69,7 @@ private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutS
private var masterInfo: MasterInfo = null
private var executorNameToActor = Map.empty[String, ActorRef]
private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher()
- private val jarStoreService = JarStoreService.get(systemConfig)
- jarStoreService.init(systemConfig, context.system)
+ private val jarStoreClient = new JarStoreClient(systemConfig, context.system)
private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
private val resourceUpdateTimeoutMs = 30000 // Milliseconds
@@ -171,7 +170,7 @@ private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutS
val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
- jarStoreService, executorProcLauncher))
+ jarStoreClient, executorProcLauncher))
executorNameToActor += actorName -> executor
resource = resource - launch.resource
@@ -339,7 +338,7 @@ private[cluster] object Worker {
launch: LaunchExecutor,
masterInfo: MasterInfo,
ioPool: ExecutionContext,
- jarStoreService: JarStoreService,
+ jarStoreClient: JarStoreClient,
procLauncher: ExecutorProcessLauncher) extends Actor {
import launch.{appId, executorId, resource}
@@ -407,7 +406,7 @@ private[cluster] object Worker {
val process = Future {
val jarPath = ctx.jar.map { appJar =>
val tempFile = File.createTempFile(appJar.name, ".jar")
- jarStoreService.copyToLocalFile(tempFile, appJar.filePath)
+ jarStoreClient.copyToLocalFile(tempFile, appJar.filePath)
val file = new URL("file:" + tempFile)
file.getFile
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f8f91664/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala
new file mode 100644
index 0000000..ebaf354
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore.dfs
+
+import java.io.{InputStream, OutputStream}
+
+import com.typesafe.config.Config
+import org.apache.gearpump.jarstore.JarStore
+import org.apache.gearpump.util.Constants
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
+
+/**
+ * DFSJarStore store the uploaded jar on HDFS
+ */
+class DFSJarStore extends JarStore {
+ private var rootPath: Path = null
+ override val scheme: String = "hdfs"
+
+ override def init(config: Config): Unit = {
+ rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH))
+ val fs = rootPath.getFileSystem(new Configuration())
+ if (!fs.exists(rootPath)) {
+ fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ }
+ }
+
+ /**
+ * Creates the file on JarStore.
+ *
+ * @param fileName name of the file to be created on JarStore.
+ * @return OutputStream returns a stream into which the data can be written.
+ */
+ override def createFile(fileName: String): OutputStream = {
+ val filePath = new Path(rootPath, fileName)
+ val fs = filePath.getFileSystem(new Configuration())
+ fs.create(filePath)
+ }
+
+ /**
+ * Gets the InputStream to read the file
+ *
+ * @param fileName name of the file to be read on JarStore.
+ * @return InputStream returns a stream from which the data can be read.
+ */
+ override def getFile(fileName: String): InputStream = {
+ val filePath = new Path(rootPath, fileName)
+ val fs = filePath.getFileSystem(new Configuration())
+ fs.open(filePath)
+ }
+}