You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:56 UTC
[46/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/package.scala b/core/src/main/scala/io/gearpump/package.scala
deleted file mode 100644
index 1877651..0000000
--- a/core/src/main/scala/io/gearpump/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io
-
-package object gearpump {
- type TimeStamp = Long
- val LatestTime = -1
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
deleted file mode 100644
index 0b9c57e..0000000
--- a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import io.gearpump.Message
-
-/** Used by storm module to broadcast message to all downstream tasks */
-class BroadcastPartitioner extends MulticastPartitioner {
- private var lastPartitionNum = -1
- private var partitions = Array.empty[Int]
-
- override def getPartitions(
- msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
- if (partitionNum != lastPartitionNum) {
- partitions = (0 until partitionNum).toArray
- lastPartitionNum = partitionNum
- }
- partitions
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
deleted file mode 100644
index 062fc10..0000000
--- a/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import io.gearpump.Message
-
-/**
- * Will have the same parallelism with last processor
- * And each task in current processor will co-locate with task of last processor
- */
-class CoLocationPartitioner extends UnicastPartitioner {
- override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
- currentPartitionId
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
deleted file mode 100644
index 6ba0cd6..0000000
--- a/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import io.gearpump.Message
-
-/**
- * Only make sense when the message has implemented the hashCode()
- * Otherwise, it will use Object.hashCode(), which will not return
- * same hash code after serialization and deserialization.
- */
-class HashPartitioner extends UnicastPartitioner {
- override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
- (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
deleted file mode 100644
index 69104c7..0000000
--- a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import scala.reflect.ClassTag
-
-import org.apache.commons.lang.SerializationUtils
-
-import io.gearpump.Message
-
-/**
- * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task
- * of upstream processor A send to several tasks of downstream processor B.
- */
-sealed trait Partitioner extends Serializable
-
-/**
- * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does
- * ONE-task {@literal ->} ONE-task mapping.
- */
-trait UnicastPartitioner extends Partitioner {
-
- /**
- * Gets the SINGLE downstream processor task index to send message to.
- *
- * @param msg Message you want to send
- * @param partitionNum How many tasks does the downstream processor have.
- * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call.
- *
- * @return ONE task index of downstream processor.
- */
- def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int
-
- def getPartition(msg: Message, partitionNum: Int): Int = {
- getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
- }
-}
-
-trait MulticastPartitioner extends Partitioner {
-
- /**
- * Gets a list of downstream processor task indexes to send message to.
- *
- * @param upstreamTaskIndex Current sender task's task index.
- *
- */
- def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int]
-
- def getPartitions(msg: Message, partitionNum: Int): Array[Int] = {
- getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
- }
-}
-
-sealed trait PartitionerFactory {
-
- def name: String
-
- def partitioner: Partitioner
-}
-
-/** Stores the Partitioner in an object. To use it, user need to deserialize the object */
-class PartitionerObject(private[this] val _partitioner: Partitioner)
- extends PartitionerFactory with Serializable {
-
- override def name: String = partitioner.getClass.getName
-
- override def partitioner: Partitioner = {
- SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
- }
-}
-
-/** Store the partitioner in class Name, the user need to instantiate a new class */
-class PartitionerByClassName(partitionerClass: String)
- extends PartitionerFactory with Serializable {
-
- override def name: String = partitionerClass
- override def partitioner: Partitioner = {
- Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
- }
-}
-
-/**
- * @param partitionerFactory How we construct a Partitioner.
- */
-case class PartitionerDescription(partitionerFactory: PartitionerFactory)
-
-object Partitioner {
- val UNKNOWN_PARTITION_ID = -1
-
- def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = {
- PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
deleted file mode 100644
index ff962fa..0000000
--- a/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import scala.util.Random
-
-import io.gearpump.Message
-
-/**
- * The idea of ShuffleGroupingPartitioner is derived from Storm.
- * Messages are randomly distributed across the downstream's tasks in a way such that
- * each task is guaranteed to get an equal number of messages.
- */
-class ShuffleGroupingPartitioner extends UnicastPartitioner {
- private val random = new Random
- private var index = -1
- private var partitions = List.empty[Int]
- override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
- index += 1
- if (partitions.isEmpty) {
- partitions = 0.until(partitionNum).toList
- partitions = random.shuffle(partitions)
- } else if (index >= partitionNum) {
- index = 0
- partitions = random.shuffle(partitions)
- }
- partitions(index)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
deleted file mode 100644
index 6b3c26e..0000000
--- a/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import java.util.Random
-
-import io.gearpump.Message
-
-/**
- * Round Robin partition the data to downstream processor tasks.
- */
-class ShufflePartitioner extends UnicastPartitioner {
- private var seed = 0
- private var count = 0
-
- override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-
- if (seed == 0) {
- seed = newSeed()
- }
-
- val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum
- count = count + 1
- result
- }
-
- private def newSeed(): Int = new Random().nextInt()
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/security/Authenticator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/security/Authenticator.scala b/core/src/main/scala/io/gearpump/security/Authenticator.scala
deleted file mode 100644
index 73bc8e1..0000000
--- a/core/src/main/scala/io/gearpump/security/Authenticator.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.security
-import scala.concurrent.{ExecutionContext, Future}
-
-import io.gearpump.security.Authenticator.AuthenticationResult
-
-/**
- * Authenticator for UI dashboard.
- *
- * Sub Class must implement a constructor with signature like this:
- * this(config: Config)
- */
-trait Authenticator {
-
- // TODO: Change the signature to return more attributes of user credentials...
- def authenticate(
- user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult]
-}
-
-object Authenticator {
-
- trait AuthenticationResult {
-
- def authenticated: Boolean
-
- def permissionLevel: Int
- }
-
- val UnAuthenticated = new AuthenticationResult {
- override val authenticated = false
- override val permissionLevel = -1
- }
-
- /** Guest can view but have no permission to submit app or write */
- val Guest = new AuthenticationResult {
- override val authenticated = true
- override val permissionLevel = 1000
- }
-
- /** User can submit app, kill app, but have no permission to add or remote machines */
- val User = new AuthenticationResult {
- override val authenticated = true
- override val permissionLevel = 1000 + Guest.permissionLevel
- }
-
- /** Super user */
- val Admin = new AuthenticationResult {
- override val authenticated = true
- override val permissionLevel = 1000 + User.permissionLevel
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala b/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
deleted file mode 100644
index 0743a3f..0000000
--- a/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.security
-
-import scala.concurrent.{ExecutionContext, Future}
-
-import com.typesafe.config.Config
-
-import io.gearpump.security.Authenticator.AuthenticationResult
-import io.gearpump.security.ConfigFileBasedAuthenticator._
-
-object ConfigFileBasedAuthenticator {
-
- private val ROOT = "gearpump.ui-security.config-file-based-authenticator"
- private val ADMINS = ROOT + "." + "admins"
- private val USERS = ROOT + "." + "users"
- private val GUESTS = ROOT + "." + "guests"
-
- private case class Credentials(
- admins: Map[String, String], users: Map[String, String], guests: Map[String, String]) {
-
- def verify(user: String, password: String): AuthenticationResult = {
- if (admins.contains(user)) {
- if (verify(user, password, admins)) {
- Authenticator.Admin
- } else {
- Authenticator.UnAuthenticated
- }
- } else if (users.contains(user)) {
- if (verify(user, password, users)) {
- Authenticator.User
- } else {
- Authenticator.UnAuthenticated
- }
- } else if (guests.contains(user)) {
- if (verify(user, password, guests)) {
- Authenticator.Guest
- } else {
- Authenticator.UnAuthenticated
- }
- } else {
- Authenticator.UnAuthenticated
- }
- }
-
- private def verify(user: String, password: String, map: Map[String, String]): Boolean = {
- val storedPass = map(user)
- PasswordUtil.verify(password, storedPass)
- }
- }
-}
-
-/**
- * UI dashboard authenticator based on configuration file.
- *
- * It has three categories of users: admins, users, and guests.
- * admins have unlimited permission, like shutdown a cluster, add/remove machines.
- * users have limited permission to submit an application and etc..
- * guests can not submit/kill applications, but can view the application status.
- *
- * see conf/gear.conf section gearpump.ui-security.config-file-based-authenticator to find
- * information about how to configure this authenticator.
- *
- * [Security consideration]
- * It will keep one-way sha1 digest of password instead of password itself. The original password is
- * NOT kept in any way, so generally it is safe.
- *
- *
- * digesting flow (from original password to digest):
- * {{{
- * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) ->
- * base64Encode.
- * }}}
- *
- * Verification user input password with stored digest:
- * {{{
- * base64Decode -> extract salt -> do sha1(salt, password) -> generate digest:
- * salt + sha1 -> compare the generated digest with the stored digest.
- * }}}
- */
-class ConfigFileBasedAuthenticator(config: Config) extends Authenticator {
-
- private val credentials = loadCredentials(config)
-
- override def authenticate(user: String, password: String, ec: ExecutionContext)
- : Future[AuthenticationResult] = {
- implicit val ctx = ec
- Future {
- credentials.verify(user, password)
- }
- }
-
- private def loadCredentials(config: Config): Credentials = {
- val admins = configToMap(config, ADMINS)
- val users = configToMap(config, USERS)
- val guests = configToMap(config, GUESTS)
- new Credentials(admins, users, guests)
- }
-
- private def configToMap(config: Config, path: String) = {
- import scala.collection.JavaConverters._
- config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/security/PasswordUtil.scala b/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
deleted file mode 100644
index 9bf40d2..0000000
--- a/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.security
-
-import java.security.MessageDigest
-import scala.util.Try
-
-import sun.misc.{BASE64Decoder, BASE64Encoder}
-
-/**
- * Util to verify whether user input password is valid or not.
- * It use sha1 to do the digesting.
- */
-object PasswordUtil {
- private val SALT_LENGTH = 8
-
- /**
- * Verifies user input password with stored digest:
- * {{{
- * base64Decode -> extract salt -> do sha1(salt, password) ->
- * generate digest: salt + sha1 -> compare the generated digest with the stored digest.
- * }}}
- */
- def verify(password: String, stored: String): Boolean = {
- Try {
- val decoded = base64Decode(stored)
- val salt = new Array[Byte](SALT_LENGTH)
- Array.copy(decoded, 0, salt, 0, SALT_LENGTH)
-
- hash(password, salt) == stored
- }.getOrElse(false)
- }
- /**
- * digesting flow (from original password to digest):
- * {{{
- * random salt byte array of length 8 ->
- * byte array of (salt + sha1(salt, password)) -> base64Encode
- * }}}
- */
- def hash(password: String): String = {
- // Salt generation 64 bits long
- val salt = new Array[Byte](SALT_LENGTH)
- new java.util.Random().nextBytes(salt)
- hash(password, salt)
- }
-
- private def hash(password: String, salt: Array[Byte]): String = {
- val digest = MessageDigest.getInstance("SHA-1")
- digest.reset()
- digest.update(salt)
- var input = digest.digest(password.getBytes("UTF-8"))
- digest.reset()
- input = digest.digest(input)
- val withSalt = salt ++ input
- base64Encode(withSalt)
- }
-
- private def base64Encode(data: Array[Byte]): String = {
- val endecoder = new BASE64Encoder()
- endecoder.encode(data)
- }
-
- private def base64Decode(data: String): Array[Byte] = {
- val decoder = new BASE64Decoder()
- decoder.decodeBuffer(data)
- }
-
- // scalastyle:off println
- private def help() = {
- Console.println("usage: gear io.gearpump.security.PasswordUtil -password <your password>")
- }
-
- def main(args: Array[String]): Unit = {
- if (args.length != 2 || args(0) != "-password") {
- help()
- } else {
- val pass = args(1)
- val result = hash(pass)
- Console.println("Here is the hashed password")
- Console.println("==============================")
- Console.println(result)
- }
- }
- // scalastyle:on println
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
deleted file mode 100644
index cb9d563..0000000
--- a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.serializer
-
-import akka.actor.ExtendedActorSystem
-
-import io.gearpump.cluster.UserConfig
-
-/**
- * A build-in serializer framework using kryo
- *
- * NOTE: The Kryo here is a shaded version by Gearpump
- */
-class FastKryoSerializationFramework extends SerializationFramework {
- private var system: ExtendedActorSystem = null
-
- private lazy val pool = new ThreadLocal[Serializer]() {
- override def initialValue(): Serializer = {
- new FastKryoSerializer(system)
- }
- }
-
- override def init(system: ExtendedActorSystem, config: UserConfig): Unit = {
- this.system = system
- }
-
- override def get(): Serializer = {
- pool.get()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
deleted file mode 100644
index 57b7b5e..0000000
--- a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.serializer
-
-import akka.actor.ExtendedActorSystem
-
-import io.gearpump.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
-import io.gearpump.objenesis.strategy.StdInstantiatorStrategy
-import io.gearpump.romix.serialization.kryo.KryoSerializerWrapper
-import io.gearpump.serializer.FastKryoSerializer.KryoSerializationException
-import io.gearpump.util.LogUtil
-
-class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer {
-
- private val LOG = LogUtil.getLogger(getClass)
- private val config = system.settings.config
-
- private val kryoSerializer = new KryoSerializerWrapper(system)
- private val kryo = kryoSerializer.kryo
- val strategy = new DefaultInstantiatorStrategy
- strategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy)
- kryo.setInstantiatorStrategy(strategy)
- private val kryoClazz = new GearpumpSerialization(config).customize(kryo)
-
- override def serialize(message: Any): Array[Byte] = {
- try {
- kryoSerializer.toBinary(message)
- } catch {
- case ex: java.lang.IllegalArgumentException =>
- val clazz = message.getClass
- val error = s"""
- | ${ex.getMessage}
- |You can also register the class by providing a configuration with serializer
- |defined,
- |
- |gearpump{
- | serializers {
- | ## Follow this format when adding new serializer for new message types
- | # "yourpackage.YourClass" = "yourpackage.YourSerializerForThisClass"
- |
- | ## If you intend to use default serializer for this class, then you can write this
- | # "yourpackage.YourClass" = ""
- | }
- |}
- |
- |If you want to register the serializer globally, you need to change
- |gear.conf on every worker in the cluster; if you only want to register
- |the serializer for a single streaming application, you need to create
- |a file under conf/ named application.conf, and add the above configuration
- |into application.conf. To verify whether the configuration is effective,
- |you can browser your UI http://{UI Server Host}:8090/api/v1.0/app/{appId}/config,
- |and check whether your custom serializer is added.
- """.stripMargin
-
- LOG.error(error, ex)
- throw new KryoSerializationException(error, ex)
- }
- }
-
- override def deserialize(msg: Array[Byte]): Any = {
- kryoSerializer.fromBinary(msg)
- }
-}
-
-object FastKryoSerializer {
- class KryoSerializationException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala b/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
deleted file mode 100644
index a7eb6cf..0000000
--- a/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.serializer
-
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
-import io.gearpump.util.{Constants, LogUtil}
-
-class GearpumpSerialization(config: Config) {
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- def customize(kryo: Kryo): Unit = {
-
- val serializationMap = configToMap(config, Constants.GEARPUMP_SERIALIZERS)
-
- serializationMap.foreach { kv =>
- val (key, value) = kv
- val keyClass = Class.forName(key)
-
- if (value == null || value.isEmpty) {
-
- // Use default serializer for this class type
- kryo.register(keyClass)
- } else {
- val valueClass = Class.forName(value)
- val register = kryo.register(keyClass,
- valueClass.newInstance().asInstanceOf[KryoSerializer[_]])
- LOG.debug(s"Registering ${keyClass}, id: ${register.getId}")
- }
- }
- kryo.setReferences(false)
-
- // Requires the user to register the class first before using
- kryo.setRegistrationRequired(true)
- }
-
- private final def configToMap(config: Config, path: String) = {
- import scala.collection.JavaConverters._
- config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala b/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
deleted file mode 100644
index 4947dcc..0000000
--- a/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.serializer
-
-import akka.actor.ExtendedActorSystem
-
-import io.gearpump.cluster.UserConfig
-
-/**
- * User are allowed to use a customized serialization framework by extending this
- * interface.
- */
-trait SerializationFramework {
- def init(system: ExtendedActorSystem, config: UserConfig)
-
- /**
- *
- * Need to be thread safe
- *
- * Get a serializer to use.
- * Note: this method can be called in a multi-thread environment. It's the
- * responsibility of SerializationFramework Developer to assure this method
- * is thread safe.
- *
- * To be thread-safe, one recommendation would be using a thread local pool
- * to maintain reference to Serializer of same thread.
- *
- * @return
- */
- def get(): Serializer
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/Serializer.scala b/core/src/main/scala/io/gearpump/serializer/Serializer.scala
deleted file mode 100644
index ff8b147..0000000
--- a/core/src/main/scala/io/gearpump/serializer/Serializer.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.serializer
-
-/**
- * User defined message serializer
- */
-trait Serializer {
- def serialize(message: Any): Array[Byte]
-
- def deserialize(msg: Array[Byte]): Any
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/Express.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/Express.scala b/core/src/main/scala/io/gearpump/transport/Express.scala
deleted file mode 100644
index 101b841..0000000
--- a/core/src/main/scala/io/gearpump/transport/Express.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport
-
-import scala.collection.immutable.LongMap
-import scala.concurrent._
-
-import akka.actor._
-import akka.agent.Agent
-import org.slf4j.Logger
-
-import io.gearpump.transport.netty.Client.Close
-import io.gearpump.transport.netty.{Context, TaskMessage}
-import io.gearpump.util.LogUtil
-
-trait ActorLookupById {
-
- /** Lookup actor ref for local task actor by providing a TaskId (TaskId.toLong) */
- def lookupLocalActor(id: Long): Option[ActorRef]
-}
-
-/**
- * Custom networking layer.
- *
- * It will translate long sender/receiver address to shorter ones to reduce
- * the network overhead.
- */
-class Express(val system: ExtendedActorSystem) extends Extension with ActorLookupById {
-
- import system.dispatcher
-
- import io.gearpump.transport.Express._
- val localActorMap = Agent(LongMap.empty[ActorRef])
- val remoteAddressMap = Agent(Map.empty[Long, HostPort])
-
- val remoteClientMap = Agent(Map.empty[HostPort, ActorRef])
-
- val conf = system.settings.config
-
- lazy val (context, serverPort, localHost) = init
-
- lazy val init = {
- LOG.info(s"Start Express init ...${system.name}")
- val context = new Context(system, conf)
- val serverPort = context.bind("netty-server", this)
- val localHost = HostPort(system.provider.getDefaultAddress.host.get, serverPort)
- LOG.info(s"binding to netty server $localHost")
-
- system.registerOnTermination(new Runnable {
- override def run(): Unit = context.close()
- })
- (context, serverPort, localHost)
- }
-
- def unregisterLocalActor(id: Long): Unit = {
- localActorMap.sendOff(_ - id)
- }
-
- /** Start Netty client actors to connect to remote machines */
- def startClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = {
- val clientsToClose = remoteClientMap.get().filterKeys(!hostPorts.contains(_)).keySet
- closeClients(clientsToClose)
- hostPorts.toList.foldLeft(Future(Map.empty[HostPort, ActorRef])) { (future, hostPort) =>
- remoteClientMap.alter { map =>
- if (!map.contains(hostPort)) {
- val actor = context.connect(hostPort)
- map + (hostPort -> actor)
- } else {
- map
- }
- }
- }
- }
-
- def closeClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = {
- remoteClientMap.alter { map =>
- map.filterKeys(hostPorts.contains).foreach { hostAndClient =>
- val (_, client) = hostAndClient
- client ! Close
- }
- map -- hostPorts
- }
- }
-
- def registerLocalActor(id: Long, actor: ActorRef): Unit = {
- LOG.info(s"RegisterLocalActor: $id, actor: ${actor.path.name}")
- init
- localActorMap.sendOff(_ + (id -> actor))
- }
-
- def lookupLocalActor(id: Long): Option[ActorRef] = localActorMap.get().get(id)
-
- def lookupRemoteAddress(id: Long): Option[HostPort] = remoteAddressMap.get().get(id)
-
- /** Send message to remote task */
- def transport(taskMessage: TaskMessage, remote: HostPort): Unit = {
-
- val remoteClient = remoteClientMap.get.get(remote)
- if (remoteClient.isDefined) {
- remoteClient.get.tell(taskMessage, Actor.noSender)
- } else {
- val errorMsg = s"Clients has not been launched properly before transporting messages, " +
- s"the destination is $remote"
- LOG.error(errorMsg)
- throw new Exception(errorMsg)
- }
- }
-}
-
-/** A customized transport layer by using Akka extension */
-object Express extends ExtensionId[Express] with ExtensionIdProvider {
- val LOG: Logger = LogUtil.getLogger(getClass)
-
- override def get(system: ActorSystem): Express = super.get(system)
-
- override def lookup: ExtensionId[Express] = Express
-
- override def createExtension(system: ExtendedActorSystem): Express = new Express(system)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/HostPort.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/HostPort.scala b/core/src/main/scala/io/gearpump/transport/HostPort.scala
deleted file mode 100644
index 40c4342..0000000
--- a/core/src/main/scala/io/gearpump/transport/HostPort.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport
-
-case class HostPort(host: String, port: Int) {
- def toTuple: (String, Int) = {
- (host, port)
- }
-}
-
-object HostPort {
- def apply(address: String): HostPort = {
- val hostAndPort = address.split(":")
- new HostPort(hostAndPort(0), hostAndPort(1).toInt)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/Client.scala b/core/src/main/scala/io/gearpump/transport/netty/Client.scala
deleted file mode 100644
index d5960ad..0000000
--- a/core/src/main/scala/io/gearpump/transport/netty/Client.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport.netty
-
-import java.net.{ConnectException, InetSocketAddress}
-import java.nio.channels.ClosedChannelException
-import java.util
-import java.util.Random
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
-import scala.language.implicitConversions
-
-import akka.actor.Actor
-import org.jboss.netty.bootstrap.ClientBootstrap
-import org.jboss.netty.channel._
-import org.slf4j.Logger
-
-import io.gearpump.transport.HostPort
-import io.gearpump.util.LogUtil
-
-/**
- * Netty Client implemented as an actor, on the other side, there is a netty server Actor.
- * All messages sent to this actor will be forwarded to remote machine.
- */
-class Client(conf: NettyConfig, factory: ChannelFactory, hostPort: HostPort) extends Actor {
- import io.gearpump.transport.netty.Client._
-
- val name = s"netty-client-$hostPort"
-
- private final var bootstrap: ClientBootstrap = null
- private final val random: Random = new Random
- private val serializer = conf.newTransportSerializer
- private var channel: Channel = null
-
- var batch = new util.ArrayList[TaskMessage]
-
- private val init = {
- bootstrap = NettyUtil.createClientBootStrap(factory,
- new ClientPipelineFactory(name, conf), conf.buffer_size)
- self ! Connect(0)
- }
-
- def receive: Receive = messageHandler orElse connectionHandler
-
- def messageHandler: Receive = {
- case msg: TaskMessage =>
- batch.add(msg)
- case flush@Flush(flushChannel) =>
- if (channel != flushChannel) {
- Unit // Drop, as it belong to old channel flush message
- } else if (batch.size > 0 && flushChannel.isWritable) {
- send(flushChannel, batch.iterator)
- batch.clear()
- self ! flush
- } else {
- import context.dispatcher
- context.system.scheduler.scheduleOnce(
- new FiniteDuration(conf.flushCheckInterval, TimeUnit.MILLISECONDS), self, flush)
- }
- }
-
- def connectionHandler: Receive = {
- case ChannelReady(channel) =>
- this.channel = channel
- self ! Flush(channel)
- case Connect(tries) =>
- if (null == channel) {
- connect(tries)
- } else {
- LOG.error("there already exist a channel, will not establish a new one...")
- }
- case CompareAndReconnectIfEqual(oldChannel) =>
- if (oldChannel == channel) {
- channel = null
- self ! Connect(0)
- }
- case Close =>
- close()
- context.become(closed)
- }
-
- def closed: Receive = {
- case msg: AnyRef =>
- LOG.error(s"This client $name is closed, drop any message ${msg.getClass.getSimpleName}...")
- }
-
- private def connect(tries: Int): Unit = {
- LOG.info(s"netty client try to connect to $name, tries: $tries")
- if (tries <= conf.max_retries) {
- val remote_addr = new InetSocketAddress(hostPort.host, hostPort.port)
- val future = bootstrap.connect(remote_addr)
- future success { current =>
- LOG.info(s"netty client successfully connectted to $name, tries: $tries")
- self ! ChannelReady(current)
- } fail { (current, ex) =>
- LOG.error(s"failed to connect to $name, reason: ${ex.getMessage}, class: ${ex.getClass}")
- current.close()
- import context.dispatcher
- context.system.scheduler.scheduleOnce(
- new FiniteDuration(
- getSleepTimeMs(tries), TimeUnit.MILLISECONDS), self, Connect(tries + 1))
- }
- } else {
- LOG.error(s"fail to connect to a remote host $name after retied $tries ...")
- self ! Close
- }
- }
-
- private def send(flushChannel: Channel, msgs: util.Iterator[TaskMessage]) {
- var messageBatch: MessageBatch = null
-
- while (msgs.hasNext) {
- val message: TaskMessage = msgs.next()
- if (null == messageBatch) {
- messageBatch = new MessageBatch(conf.messageBatchSize, serializer)
- }
- messageBatch.add(message)
- if (messageBatch.isFull) {
- val toBeFlushed: MessageBatch = messageBatch
- flushRequest(flushChannel, toBeFlushed)
- messageBatch = null
- }
- }
- if (null != messageBatch && !messageBatch.isEmpty) {
- flushRequest(flushChannel, messageBatch)
- }
- }
-
- private def close() {
- LOG.info(s"closing netty client $name...")
- if (null != channel) {
- channel.close()
- channel = null
- }
- batch = null
- }
-
- override def postStop(): Unit = {
- close()
- }
-
- private def flushRequest(channel: Channel, requests: MessageBatch) {
- val future: ChannelFuture = channel.write(requests)
- future.fail { (channel, ex) =>
- if (channel.isOpen) {
- channel.close
- }
- LOG.error(s"failed to send requests " +
- s"to ${channel.getRemoteAddress} ${ex.getClass.getSimpleName}")
- if (!ex.isInstanceOf[ClosedChannelException]) {
- LOG.error(ex.getMessage, ex)
- }
- self ! CompareAndReconnectIfEqual(channel)
- }
- }
-
- private def getSleepTimeMs(retries: Int): Long = {
- if (retries > 30) {
- conf.max_sleep_ms
- } else {
- val backoff = 1 << retries
- val sleepMs = conf.base_sleep_ms * Math.max(1, random.nextInt(backoff))
- if (sleepMs < conf.max_sleep_ms) sleepMs else conf.max_sleep_ms
- }
- }
-
- private def isChannelWritable = (null != channel) && channel.isWritable
-}
-
-object Client {
- val LOG: Logger = LogUtil.getLogger(getClass)
-
- // Reconnect if current channel equals channel
- case class CompareAndReconnectIfEqual(channel: Channel)
-
- case class Connect(tries: Int)
- case class ChannelReady(chanel: Channel)
- case object Close
-
- case class Flush(channel: Channel)
-
- class ClientErrorHandler(name: String) extends SimpleChannelUpstreamHandler {
-
- override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) {
- event.getCause match {
- case ex: ConnectException => Unit
- case ex: ClosedChannelException =>
- LOG.warn("exception found when trying to close netty connection", ex.getMessage)
- case ex => LOG.error("Connection failed " + name, ex)
- }
- }
- }
-
- class ClientPipelineFactory(name: String, conf: NettyConfig) extends ChannelPipelineFactory {
- def getPipeline: ChannelPipeline = {
- val pipeline: ChannelPipeline = Channels.pipeline
- pipeline.addLast("decoder", new MessageDecoder(conf.newTransportSerializer))
- pipeline.addLast("encoder", new MessageEncoder)
- pipeline.addLast("handler", new ClientErrorHandler(name))
- pipeline
- }
- }
-
- implicit def channelFutureToChannelFutureOps(channel: ChannelFuture): ChannelFutureOps = {
- new ChannelFutureOps(channel)
- }
-
- class ChannelFutureOps(channelFuture: ChannelFuture) {
-
- def success(handler: (Channel => Unit)): ChannelFuture = {
- channelFuture.addListener(new ChannelFutureListener {
- def operationComplete(future: ChannelFuture) {
- if (future.isSuccess) {
- handler(future.getChannel)
- }
- }
- })
- channelFuture
- }
-
- def fail(handler: ((Channel, Throwable) => Unit)): ChannelFuture = {
- channelFuture.addListener(new ChannelFutureListener {
- def operationComplete(future: ChannelFuture) {
- if (!future.isSuccess) {
- handler(future.getChannel, future.getCause)
- }
- }
- })
- channelFuture
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/Context.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/Context.scala b/core/src/main/scala/io/gearpump/transport/netty/Context.scala
deleted file mode 100644
index 9a9ee29..0000000
--- a/core/src/main/scala/io/gearpump/transport/netty/Context.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport.netty
-
-import java.io.Closeable
-import java.util.concurrent._
-import scala.collection.JavaConverters._
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import com.typesafe.config.Config
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.slf4j.Logger
-
-import io.gearpump.transport.netty.Server.ServerPipelineFactory
-import io.gearpump.transport.{ActorLookupById, HostPort}
-import io.gearpump.util.{Constants, LogUtil}
-
-object Context {
- private final val LOG: Logger = LogUtil.getLogger(getClass)
-}
-
-/** Netty Context */
-class Context(system: ActorSystem, conf: NettyConfig) extends IContext {
- import io.gearpump.transport.netty.Context._
-
- def this(system: ActorSystem, conf: Config) {
- this(system, new NettyConfig(conf))
- }
-
- private val closeHandler = new ConcurrentLinkedQueue[Closeable]()
- private val nettyDispatcher = system.settings.config.getString(Constants.NETTY_DISPATCHER)
- val maxWorkers: Int = 1
-
- private lazy val clientChannelFactory: NioClientSocketChannelFactory = {
- val bossFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-boss")
- val workerFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-worker")
- val channelFactory =
- new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory), maxWorkers)
-
- closeHandler.add(new Closeable {
- override def close(): Unit = {
- LOG.info("Closing all client resources....")
- channelFactory.releaseExternalResources
- }
- })
- channelFactory
- }
-
- def bind(
- name: String, lookupActor : ActorLookupById, deserializeFlag : Boolean = true,
- inputPort: Int = 0): Int = {
- // TODO: whether we should expose it as application config?
- val server = system.actorOf(Props(classOf[Server], name, conf, lookupActor,
- deserializeFlag).withDispatcher(nettyDispatcher), name)
- val (port, channel) = NettyUtil.newNettyServer(name,
- new ServerPipelineFactory(server, conf), 5242880, inputPort)
- val factory = channel.getFactory
- closeHandler.add(new Closeable {
- override def close(): Unit = {
- system.stop(server)
- channel.close()
- LOG.info("Closing all server resources....")
- factory.releaseExternalResources
- }
- })
- port
- }
-
- def connect(hostPort: HostPort): ActorRef = {
- val client = system.actorOf(Props(classOf[Client], conf, clientChannelFactory, hostPort)
- .withDispatcher(nettyDispatcher))
- closeHandler.add(new Closeable {
- override def close(): Unit = {
- LOG.info("closing Client actor....")
- system.stop(client)
- }
- })
-
- client
- }
-
- /**
- * terminate this context
- */
- def close(): Unit = {
-
- LOG.info(s"Context.term, cleanup resources...., " +
- s"we have ${closeHandler.size()} items to close...")
-
- // Cleans up resource in reverse order so that client actor can be cleaned
- // before clientChannelFactory
- closeHandler.iterator().asScala.toList.reverse.foreach(_.close())
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/IContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/IContext.scala b/core/src/main/scala/io/gearpump/transport/netty/IContext.scala
deleted file mode 100644
index 56b2f7c..0000000
--- a/core/src/main/scala/io/gearpump/transport/netty/IContext.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport.netty
-
-import akka.actor.ActorRef
-
-import io.gearpump.transport.{ActorLookupById, HostPort}
-
-trait IContext {
-
- /**
- * Create a Netty server connection.
- */
- def bind(name: String, lookupActor: ActorLookupById, deserializeFlag: Boolean, port: Int): Int
-
- /**
- * Create a Netty client actor
- */
- def connect(hostPort: HostPort): ActorRef
-
- /**
- * Close resource for this context
- */
- def close()
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala b/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala
deleted file mode 100644
index a62eff5..0000000
--- a/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport.netty
-
-import com.typesafe.config.Config
-
-import io.gearpump.util.Constants
-
-class NettyConfig(conf: Config) {
-
- val buffer_size = conf.getInt(Constants.NETTY_BUFFER_SIZE)
- val max_retries = conf.getInt(Constants.NETTY_MAX_RETRIES)
- val base_sleep_ms = conf.getInt(Constants.NETTY_BASE_SLEEP_MS)
- val max_sleep_ms = conf.getInt(Constants.NETTY_MAX_SLEEP_MS)
- val messageBatchSize = conf.getInt(Constants.NETTY_MESSAGE_BATCH_SIZE)
- val flushCheckInterval = conf.getInt(Constants.NETTY_FLUSH_CHECK_INTERVAL)
-
- def newTransportSerializer: ITransportMessageSerializer = {
- Class.forName(
- conf.getString(Constants.GEARPUMP_TRANSPORT_SERIALIZER))
- .newInstance().asInstanceOf[ITransportMessageSerializer]
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala b/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala
deleted file mode 100644
index 3e746af..0000000
--- a/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport.netty
-
-import java.net.InetSocketAddress
-import java.util.concurrent.{Executors, ThreadFactory}
-
-import org.jboss.netty.bootstrap.{ClientBootstrap, ServerBootstrap}
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
-import org.jboss.netty.channel.{Channel, ChannelFactory, ChannelPipelineFactory}
-
-object NettyUtil {
-
- def newNettyServer(
- name: String,
- pipelineFactory: ChannelPipelineFactory,
- buffer_size: Int,
- inputPort: Int = 0): (Int, Channel) = {
- val bossFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-boss")
- val workerFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-worker")
- val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory), 1)
-
- val bootstrap = createServerBootStrap(factory, pipelineFactory, buffer_size)
- val channel: Channel = bootstrap.bind(new InetSocketAddress(inputPort))
- val port = channel.getLocalAddress().asInstanceOf[InetSocketAddress].getPort()
- (port, channel)
- }
-
- def createServerBootStrap(
- factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int)
- : ServerBootstrap = {
- val bootstrap = new ServerBootstrap(factory)
- bootstrap.setOption("child.tcpNoDelay", true)
- bootstrap.setOption("child.receiveBufferSize", buffer_size)
- bootstrap.setOption("child.keepAlive", true)
- bootstrap.setPipelineFactory(pipelineFactory)
- bootstrap
- }
-
- def createClientBootStrap(
- factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int)
- : ClientBootstrap = {
- val bootstrap = new ClientBootstrap(factory)
- bootstrap.setOption("tcpNoDelay", true)
- bootstrap.setOption("sendBufferSize", buffer_size)
- bootstrap.setOption("keepAlive", true)
- bootstrap.setPipelineFactory(pipelineFactory)
- bootstrap
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/Server.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/Server.scala b/core/src/main/scala/io/gearpump/transport/netty/Server.scala
deleted file mode 100644
index 9a9d79b..0000000
--- a/core/src/main/scala/io/gearpump/transport/netty/Server.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport.netty
-
-import java.util
-import scala.collection.JavaConverters._
-import scala.collection.immutable.IntMap
-import scala.concurrent.Future
-
-import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem}
-import org.jboss.netty.channel._
-import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup}
-import org.slf4j.Logger
-
-import io.gearpump.transport.ActorLookupById
-import io.gearpump.util.{AkkaHelper, LogUtil}
-
-/** Netty server actor, message received will be forward to the target on the address line. */
-class Server(
- name: String, conf: NettyConfig, lookupActor: ActorLookupById, deserializeFlag: Boolean)
- extends Actor {
-
- private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = name)
- import io.gearpump.transport.netty.Server._
-
- val allChannels: ChannelGroup = new DefaultChannelGroup("gearpump-server")
-
- val system = context.system.asInstanceOf[ExtendedActorSystem]
-
- def receive: Receive = msgHandler orElse channelManager
- // As we will only transfer TaskId on the wire,
- // this object will translate taskId to or from ActorRef
- private val taskIdActorRefTranslation = new TaskIdActorRefTranslation(context)
-
- def channelManager: Receive = {
- case AddChannel(channel) => allChannels.add(channel)
- case CloseChannel(channel) =>
- import context.dispatcher
- Future {
- channel.close.awaitUninterruptibly
- allChannels.remove(channel)
- }
- }
-
- def msgHandler: Receive = {
- case MsgBatch(msgs) =>
- msgs.asScala.groupBy(_.targetTask()).foreach { taskBatch =>
- val (taskId, taskMessages) = taskBatch
- val actor = lookupActor.lookupLocalActor(taskId)
-
- if (actor.isEmpty) {
- LOG.error(s"Cannot find actor for id: $taskId...")
- } else taskMessages.foreach { taskMessage =>
- actor.get.tell(taskMessage.message(),
- taskIdActorRefTranslation.translateToActorRef(taskMessage.sessionId()))
- }
- }
- }
-
- override def postStop(): Unit = {
- allChannels.close.awaitUninterruptibly
- }
-}
-
-object Server {
-
- class ServerPipelineFactory(server: ActorRef, conf: NettyConfig) extends ChannelPipelineFactory {
- def getPipeline: ChannelPipeline = {
- val pipeline: ChannelPipeline = Channels.pipeline
- pipeline.addLast("decoder", new MessageDecoder(conf.newTransportSerializer))
- pipeline.addLast("encoder", new MessageEncoder)
- pipeline.addLast("handler", new ServerHandler(server))
- pipeline
- }
- }
-
- class ServerHandler(server: ActorRef) extends SimpleChannelUpstreamHandler {
- private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = server.path.name)
-
- override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
- server ! AddChannel(e.getChannel)
- }
-
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- val msgs: util.List[TaskMessage] = e.getMessage.asInstanceOf[util.List[TaskMessage]]
- if (msgs != null) {
- server ! MsgBatch(msgs)
- }
- }
-
- override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
- LOG.error("server errors in handling the request", e.getCause)
- server ! CloseChannel(e.getChannel)
- }
- }
-
- class TaskIdActorRefTranslation(context: ActorContext) {
- private var taskIdtoActorRef = IntMap.empty[ActorRef]
-
- /** 1-1 mapping from session id to fake ActorRef */
- def translateToActorRef(sessionId: Int): ActorRef = {
- if (!taskIdtoActorRef.contains(sessionId)) {
-
- // A fake ActorRef for performance optimization.
- val actorRef = AkkaHelper.actorFor(context.system, s"/session#$sessionId")
- taskIdtoActorRef += sessionId -> actorRef
- }
- taskIdtoActorRef.get(sessionId).get
- }
- }
-
- case class AddChannel(channel: Channel)
-
- case class CloseChannel(channel: Channel)
-
- case class MsgBatch(messages: java.lang.Iterable[TaskMessage])
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala b/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala
deleted file mode 100644
index 25a34d9..0000000
--- a/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.util.concurrent.{TimeUnit, TimeoutException}
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success, Try}
-
-import akka.actor._
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.util.LogUtil.ProcessType
-
-/**
- * ActorSystemBooter start a new JVM process to boot an actor system.
- * All executors are started by ActorSystemBooter
- *
- * It send the system address to "report back actor"
- */
-class ActorSystemBooter(config: Config) {
- import io.gearpump.util.ActorSystemBooter._
-
- def boot(name: String, reportBackActor: String): ActorSystem = {
- val system = ActorSystem(name, config)
- // Daemon path: http://{system}@{ip}:{port}/daemon
- system.actorOf(Props(classOf[Daemon], name, reportBackActor), "daemon")
- system
- }
-}
-
-object ActorSystemBooter {
-
- def apply(config: Config): ActorSystemBooter = new ActorSystemBooter(config)
-
- def main(args: Array[String]) {
- val name = args(0)
- val reportBack = args(1)
- val config = ClusterConfig.default()
-
- LogUtil.loadConfiguration(config, ProcessType.APPLICATION)
-
- val debugPort = Option(System.getProperty(Constants.GEARPUMP_REMOTE_DEBUG_PORT))
- debugPort.foreach { port =>
- val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass)
- LOG.info("==========================================")
- LOG.info("Remote debug port: " + port)
- LOG.info("==========================================")
- }
-
- val system = apply(config).boot(name, reportBack)
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- override def run(): Unit = {
- val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass)
- LOG.info("Maybe we have received a SIGINT signal from parent process, " +
- "start to cleanup resources....")
- system.terminate()
- }
- })
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- case class BindLifeCycle(actor: ActorRef)
- case class CreateActor(prop: Props, name: String)
- case class ActorCreated(actor: ActorRef, name: String)
- case class CreateActorFailed(name: String, reason: Throwable)
-
- case class RegisterActorSystem(systemPath: String)
-
- /**
- * This actor system will watch for parent,
- * If parent dies, this will also die
- */
- case class ActorSystemRegistered(bindLifeWith: ActorRef)
- case class RegisterActorSystemFailed(reason: Throwable)
-
- object RegisterActorSystemTimeOut
-
- class Daemon(val name: String, reportBack: String) extends Actor {
- val LOG: Logger = LogUtil.getLogger(getClass, context = name)
-
- val username = Option(System.getProperty(Constants.GEARPUMP_USERNAME)).getOrElse("not_defined")
- LOG.info(s"RegisterActorSystem to ${reportBack}, current user: $username")
-
- val reportBackActor = context.actorSelection(reportBack)
- reportBackActor ! RegisterActorSystem(ActorUtil.getSystemAddress(context.system).toString)
-
- implicit val executionContext = context.dispatcher
- val timeout = context.system.scheduler.scheduleOnce(Duration(25, TimeUnit.SECONDS),
- self, RegisterActorSystemFailed(new TimeoutException))
-
- context.become(waitForRegisterResult)
-
- def receive: Receive = null
-
- def waitForRegisterResult: Receive = {
- case ActorSystemRegistered(parent) =>
- timeout.cancel()
- context.watch(parent)
- context.become(waitCommand)
- case RegisterActorSystemFailed(ex) =>
- LOG.error("RegisterActorSystemFailed", ex)
- timeout.cancel()
- context.stop(self)
- }
-
- def waitCommand: Receive = {
- case BindLifeCycle(actor) =>
- LOG.info(s"ActorSystem $name Binding life cycle with actor: $actor")
- context.watch(actor)
- case create@CreateActor(props: Props, name: String) =>
- LOG.info(s"creating actor $name")
- val actor = Try(context.actorOf(props, name))
- actor match {
- case Success(actor) =>
- sender ! ActorCreated(actor, name)
- case Failure(e) =>
- sender ! CreateActorFailed(props.clazz.getName, e)
- }
- case PoisonPill =>
- context.stop(self)
- case Terminated(actor) =>
- LOG.info(s"System $name Watched actor is terminated $actor")
- context.stop(self)
- }
-
- override def postStop(): Unit = {
- LOG.info(s"ActorSystem $name is shutting down...")
- context.system.terminate()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ActorUtil.scala b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
deleted file mode 100644
index d5f48a7..0000000
--- a/core/src/main/scala/io/gearpump/util/ActorUtil.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import scala.concurrent.{ExecutionContext, Future}
-
-import akka.actor.Actor.Receive
-import akka.actor._
-import akka.pattern.ask
-import org.slf4j.Logger
-
-import io.gearpump.cluster.AppMasterToMaster.GetAllWorkers
-import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId}
-import io.gearpump.cluster.MasterToAppMaster.WorkerList
-import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ResolveWorkerIdResult}
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, StartExecutorSystems}
-import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.transport.HostPort
-
-object ActorUtil {
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- def getSystemAddress(system: ActorSystem): Address = {
- system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
- }
-
- def getFullPath(system: ActorSystem, path: ActorPath): String = {
- path.toStringWithAddress(getSystemAddress(system))
- }
-
- def getHostname(actor: ActorRef): String = {
- val path = actor.path
- path.address.host.getOrElse("localhost")
- }
-
- def defaultMsgHandler(actor: ActorRef): Receive = {
- case msg: Any =>
- LOG.error(s"Cannot find a matching message, ${msg.getClass.toString}, forwarded from $actor")
- }
-
- def printActorSystemTree(system: ActorSystem): Unit = {
- val extendedSystem = system.asInstanceOf[ExtendedActorSystem]
- val clazz = system.getClass
- val m = clazz.getDeclaredMethod("printTree")
- m.setAccessible(true)
- LOG.info(m.invoke(system).asInstanceOf[String])
- }
-
- /** Checks whether a actor is child actor by simply examining name */
- // TODO: fix this, we should also check the path to root besides name
- def isChildActorPath(parent: ActorRef, child: ActorRef): Boolean = {
- if (null != child) {
- parent.path.name == child.path.parent.name
- } else {
- false
- }
- }
-
- def actorNameForExecutor(appId: Int, executorId: Int): String = "app" + appId + "-executor" +
- executorId
-
- // TODO: Currently we explicitly require the master contacts to be started with this path pattern
- // akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER
- def getMasterActorPath(master: HostPort): ActorPath = {
- import io.gearpump.util.Constants.MASTER
- ActorPath.fromString(s"akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER")
- }
-
- def launchExecutorOnEachWorker(master: ActorRef, executorJvmConfig: ExecutorSystemJvmConfig,
- sender: ActorRef)(implicit executor: scala.concurrent.ExecutionContext): Unit = {
- implicit val timeout = Constants.FUTURE_TIMEOUT
-
- (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]].map { list =>
- val resources = list.workers.map {
- workerId => ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER)
- }.toArray
-
- master.tell(StartExecutorSystems(resources, executorJvmConfig), sender)
- }
- }
-
- def askAppMaster[T](master: ActorRef, appId: Int, msg: Any)(implicit ex: ExecutionContext)
- : Future[T] = {
- implicit val timeout = Constants.FUTURE_TIMEOUT
- val appmaster = askActor[ResolveAppIdResult](master, ResolveAppId(appId)).flatMap { result =>
- if (result.appMaster.isSuccess) {
- Future.successful(result.appMaster.get)
- } else {
- Future.failed(result.appMaster.failed.get)
- }
- }
- appmaster.flatMap(askActor[T](_, msg))
- }
-
- def askWorker[T](master: ActorRef, workerId: WorkerId, msg: Any)(implicit ex: ExecutionContext)
- : Future[T] = {
- implicit val timeout = Constants.FUTURE_TIMEOUT
- val worker = askActor[ResolveWorkerIdResult](master, ResolveWorkerId(workerId))
- .flatMap { result =>
- if (result.worker.isSuccess) {
- Future.successful(result.worker.get)
- } else {
- Future.failed(result.worker.failed.get)
- }
- }
- worker.flatMap(askActor[T](_, msg))
- }
-
- def askActor[T](actor: ActorRef, msg: Any)(implicit ex: ExecutionContext): Future[T] = {
- implicit val timeout = Constants.FUTURE_TIMEOUT
- (actor ? msg).asInstanceOf[Future[T]]
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/AkkaApp.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/AkkaApp.scala b/core/src/main/scala/io/gearpump/util/AkkaApp.scala
deleted file mode 100644
index 2b0bf61..0000000
--- a/core/src/main/scala/io/gearpump/util/AkkaApp.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import scala.util.Try
-
-import io.gearpump.cluster.ClusterConfig
-
-/**
- * A Main class helper to load Akka configuration automatically.
- */
-trait AkkaApp {
-
- type Config = com.typesafe.config.Config
-
- def main(akkaConf: Config, args: Array[String]): Unit
-
- def help(): Unit
-
- protected def akkaConfig: Config = {
- ClusterConfig.default()
- }
-
- def main(args: Array[String]): Unit = {
- Try {
- main(akkaConfig, args)
- }.failed.foreach { ex => help(); throw ex }
- }
-}