You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:56 UTC

[46/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/package.scala b/core/src/main/scala/io/gearpump/package.scala
deleted file mode 100644
index 1877651..0000000
--- a/core/src/main/scala/io/gearpump/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io
-
-package object gearpump {
-  type TimeStamp = Long
-  val LatestTime = -1
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
deleted file mode 100644
index 0b9c57e..0000000
--- a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import io.gearpump.Message
-
-/** Used by storm module to broadcast message to all downstream tasks  */
-class BroadcastPartitioner extends MulticastPartitioner {
-  private var lastPartitionNum = -1
-  private var partitions = Array.empty[Int]
-
-  override def getPartitions(
-      msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
-    if (partitionNum != lastPartitionNum) {
-      partitions = (0 until partitionNum).toArray
-      lastPartitionNum = partitionNum
-    }
-    partitions
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
deleted file mode 100644
index 062fc10..0000000
--- a/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import io.gearpump.Message
-
-/**
- * Will have the same parallelism with last processor
- * And each task in current processor will co-locate with task of last processor
- */
-class CoLocationPartitioner extends UnicastPartitioner {
-  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-    currentPartitionId
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
deleted file mode 100644
index 6ba0cd6..0000000
--- a/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import io.gearpump.Message
-
-/**
- * Only make sense when the message has implemented the hashCode()
- * Otherwise, it will use Object.hashCode(), which will not return
- * same hash code after serialization and deserialization.
- */
-class HashPartitioner extends UnicastPartitioner {
-  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-    (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
deleted file mode 100644
index 69104c7..0000000
--- a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import scala.reflect.ClassTag
-
-import org.apache.commons.lang.SerializationUtils
-
-import io.gearpump.Message
-
-/**
- * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task
- * of upstream processor A send to several tasks of downstream processor B.
- */
-sealed trait Partitioner extends Serializable
-
-/**
- * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does
- * ONE-task {@literal ->} ONE-task mapping.
- */
-trait UnicastPartitioner extends Partitioner {
-
-  /**
-   * Gets the SINGLE downstream processor task index to send message to.
-   *
-   * @param msg Message you want to send
-   * @param partitionNum How many tasks does the downstream processor have.
-   * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call.
-   *
-   * @return ONE task index of downstream processor.
-   */
-  def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int
-
-  def getPartition(msg: Message, partitionNum: Int): Int = {
-    getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
-  }
-}
-
-trait MulticastPartitioner extends Partitioner {
-
-  /**
-   * Gets a list of downstream processor task indexes to send message to.
-   *
-   * @param upstreamTaskIndex Current sender task's task index.
-   *
-   */
-  def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int]
-
-  def getPartitions(msg: Message, partitionNum: Int): Array[Int] = {
-    getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
-  }
-}
-
-sealed trait PartitionerFactory {
-
-  def name: String
-
-  def partitioner: Partitioner
-}
-
-/** Stores the Partitioner in an object. To use it, user need to deserialize the object */
-class PartitionerObject(private[this] val _partitioner: Partitioner)
-  extends PartitionerFactory with Serializable {
-
-  override def name: String = partitioner.getClass.getName
-
-  override def partitioner: Partitioner = {
-    SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
-  }
-}
-
-/** Store the partitioner in class Name, the user need to instantiate a new class */
-class PartitionerByClassName(partitionerClass: String)
-  extends PartitionerFactory with Serializable {
-
-  override def name: String = partitionerClass
-  override def partitioner: Partitioner = {
-    Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
-  }
-}
-
-/**
- * @param partitionerFactory How we construct a Partitioner.
- */
-case class PartitionerDescription(partitionerFactory: PartitionerFactory)
-
-object Partitioner {
-  val UNKNOWN_PARTITION_ID = -1
-
-  def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = {
-    PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
deleted file mode 100644
index ff962fa..0000000
--- a/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import scala.util.Random
-
-import io.gearpump.Message
-
-/**
- * The idea of ShuffleGroupingPartitioner is derived from Storm.
- * Messages are randomly distributed across the downstream's tasks in a way such that
- * each task is guaranteed to get an equal number of messages.
- */
-class ShuffleGroupingPartitioner extends UnicastPartitioner {
-  private val random = new Random
-  private var index = -1
-  private var partitions = List.empty[Int]
-  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-    index += 1
-    if (partitions.isEmpty) {
-      partitions = 0.until(partitionNum).toList
-      partitions = random.shuffle(partitions)
-    } else if (index >= partitionNum) {
-      index = 0
-      partitions = random.shuffle(partitions)
-    }
-    partitions(index)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
deleted file mode 100644
index 6b3c26e..0000000
--- a/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import java.util.Random
-
-import io.gearpump.Message
-
-/**
- * Round Robin partition the data to downstream processor tasks.
- */
-class ShufflePartitioner extends UnicastPartitioner {
-  private var seed = 0
-  private var count = 0
-
-  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-
-    if (seed == 0) {
-      seed = newSeed()
-    }
-
-    val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum
-    count = count + 1
-    result
-  }
-
-  private def newSeed(): Int = new Random().nextInt()
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/security/Authenticator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/security/Authenticator.scala b/core/src/main/scala/io/gearpump/security/Authenticator.scala
deleted file mode 100644
index 73bc8e1..0000000
--- a/core/src/main/scala/io/gearpump/security/Authenticator.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.security
-import scala.concurrent.{ExecutionContext, Future}
-
-import io.gearpump.security.Authenticator.AuthenticationResult
-
-/**
- * Authenticator for UI dashboard.
- *
- * Sub Class must implement a constructor with signature like this:
- * this(config: Config)
- */
-trait Authenticator {
-
-  // TODO: Change the signature to return more attributes of user credentials...
-  def authenticate(
-      user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult]
-}
-
-object Authenticator {
-
-  trait AuthenticationResult {
-
-    def authenticated: Boolean
-
-    def permissionLevel: Int
-  }
-
-  val UnAuthenticated = new AuthenticationResult {
-    override val authenticated = false
-    override val permissionLevel = -1
-  }
-
-  /** Guest can view but have no permission to submit app or write */
-  val Guest = new AuthenticationResult {
-    override val authenticated = true
-    override val permissionLevel = 1000
-  }
-
-  /** User can submit app, kill app, but have no permission to add or remote machines */
-  val User = new AuthenticationResult {
-    override val authenticated = true
-    override val permissionLevel = 1000 + Guest.permissionLevel
-  }
-
-  /** Super user */
-  val Admin = new AuthenticationResult {
-    override val authenticated = true
-    override val permissionLevel = 1000 + User.permissionLevel
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala b/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
deleted file mode 100644
index 0743a3f..0000000
--- a/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.security
-
-import scala.concurrent.{ExecutionContext, Future}
-
-import com.typesafe.config.Config
-
-import io.gearpump.security.Authenticator.AuthenticationResult
-import io.gearpump.security.ConfigFileBasedAuthenticator._
-
-object ConfigFileBasedAuthenticator {
-
-  private val ROOT = "gearpump.ui-security.config-file-based-authenticator"
-  private val ADMINS = ROOT + "." + "admins"
-  private val USERS = ROOT + "." + "users"
-  private val GUESTS = ROOT + "." + "guests"
-
-  private case class Credentials(
-      admins: Map[String, String], users: Map[String, String], guests: Map[String, String]) {
-
-    def verify(user: String, password: String): AuthenticationResult = {
-      if (admins.contains(user)) {
-        if (verify(user, password, admins)) {
-          Authenticator.Admin
-        } else {
-          Authenticator.UnAuthenticated
-        }
-      } else if (users.contains(user)) {
-        if (verify(user, password, users)) {
-          Authenticator.User
-        } else {
-          Authenticator.UnAuthenticated
-        }
-      } else if (guests.contains(user)) {
-        if (verify(user, password, guests)) {
-          Authenticator.Guest
-        } else {
-          Authenticator.UnAuthenticated
-        }
-      } else {
-        Authenticator.UnAuthenticated
-      }
-    }
-
-    private def verify(user: String, password: String, map: Map[String, String]): Boolean = {
-      val storedPass = map(user)
-      PasswordUtil.verify(password, storedPass)
-    }
-  }
-}
-
-/**
- * UI dashboard authenticator based on configuration file.
- *
- * It has three categories of users: admins, users, and guests.
- * admins have unlimited permission, like shutdown a cluster, add/remove machines.
- * users have limited permission to submit an application and etc..
- * guests can not submit/kill applications, but can view the application status.
- *
- * see conf/gear.conf section gearpump.ui-security.config-file-based-authenticator to find
- * information about how to configure this authenticator.
- *
- * [Security consideration]
- * It will keep one-way sha1 digest of password instead of password itself. The original password is
- * NOT kept in any way, so generally it is safe.
- *
- *
- * digesting flow (from original password to digest):
- * {{{
- * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) ->
- * base64Encode.
- * }}}
- *
- * Verification user input password with stored digest:
- * {{{
- * base64Decode -> extract salt -> do sha1(salt, password) -> generate digest:
- * salt + sha1 -> compare the generated digest with the stored digest.
- * }}}
- */
-class ConfigFileBasedAuthenticator(config: Config) extends Authenticator {
-
-  private val credentials = loadCredentials(config)
-
-  override def authenticate(user: String, password: String, ec: ExecutionContext)
-    : Future[AuthenticationResult] = {
-    implicit val ctx = ec
-    Future {
-      credentials.verify(user, password)
-    }
-  }
-
-  private def loadCredentials(config: Config): Credentials = {
-    val admins = configToMap(config, ADMINS)
-    val users = configToMap(config, USERS)
-    val guests = configToMap(config, GUESTS)
-    new Credentials(admins, users, guests)
-  }
-
-  private def configToMap(config: Config, path: String) = {
-    import scala.collection.JavaConverters._
-    config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/security/PasswordUtil.scala b/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
deleted file mode 100644
index 9bf40d2..0000000
--- a/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.security
-
-import java.security.MessageDigest
-import scala.util.Try
-
-import sun.misc.{BASE64Decoder, BASE64Encoder}
-
-/**
- * Util to verify whether user input password is valid or not.
- * It use sha1 to do the digesting.
- */
-object PasswordUtil {
-  private val SALT_LENGTH = 8
-
-  /**
-   * Verifies user input password with stored digest:
-   * {{{
-   * base64Decode -> extract salt -> do sha1(salt, password) ->
-   * generate digest: salt + sha1 -> compare the generated digest with the stored digest.
-   * }}}
-   */
-  def verify(password: String, stored: String): Boolean = {
-    Try {
-      val decoded = base64Decode(stored)
-      val salt = new Array[Byte](SALT_LENGTH)
-      Array.copy(decoded, 0, salt, 0, SALT_LENGTH)
-
-      hash(password, salt) == stored
-    }.getOrElse(false)
-  }
-  /**
-   * digesting flow (from original password to digest):
-   * {{{
-   * random salt byte array of length 8 ->
-   * byte array of (salt + sha1(salt, password)) -> base64Encode
-   * }}}
-   */
-  def hash(password: String): String = {
-    // Salt generation 64 bits long
-    val salt = new Array[Byte](SALT_LENGTH)
-    new java.util.Random().nextBytes(salt)
-    hash(password, salt)
-  }
-
-  private def hash(password: String, salt: Array[Byte]): String = {
-    val digest = MessageDigest.getInstance("SHA-1")
-    digest.reset()
-    digest.update(salt)
-    var input = digest.digest(password.getBytes("UTF-8"))
-    digest.reset()
-    input = digest.digest(input)
-    val withSalt = salt ++ input
-    base64Encode(withSalt)
-  }
-
-  private def base64Encode(data: Array[Byte]): String = {
-    val endecoder = new BASE64Encoder()
-    endecoder.encode(data)
-  }
-
-  private def base64Decode(data: String): Array[Byte] = {
-    val decoder = new BASE64Decoder()
-    decoder.decodeBuffer(data)
-  }
-
-  // scalastyle:off println
-  private def help() = {
-    Console.println("usage: gear io.gearpump.security.PasswordUtil -password <your password>")
-  }
-
-  def main(args: Array[String]): Unit = {
-    if (args.length != 2 || args(0) != "-password") {
-      help()
-    } else {
-      val pass = args(1)
-      val result = hash(pass)
-      Console.println("Here is the hashed password")
-      Console.println("==============================")
-      Console.println(result)
-    }
-  }
-  // scalastyle:on println
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
deleted file mode 100644
index cb9d563..0000000
--- a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.serializer
-
-import akka.actor.ExtendedActorSystem
-
-import io.gearpump.cluster.UserConfig
-
-/**
- * A build-in serializer framework using kryo
- *
- * NOTE: The Kryo here is a shaded version by Gearpump
- */
-class FastKryoSerializationFramework extends SerializationFramework {
-  private var system: ExtendedActorSystem = null
-
-  private lazy val pool = new ThreadLocal[Serializer]() {
-    override def initialValue(): Serializer = {
-      new FastKryoSerializer(system)
-    }
-  }
-
-  override def init(system: ExtendedActorSystem, config: UserConfig): Unit = {
-    this.system = system
-  }
-
-  override def get(): Serializer = {
-    pool.get()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
deleted file mode 100644
index 57b7b5e..0000000
--- a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.serializer
-
-import akka.actor.ExtendedActorSystem
-
-import io.gearpump.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
-import io.gearpump.objenesis.strategy.StdInstantiatorStrategy
-import io.gearpump.romix.serialization.kryo.KryoSerializerWrapper
-import io.gearpump.serializer.FastKryoSerializer.KryoSerializationException
-import io.gearpump.util.LogUtil
-
-class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer {
-
-  private val LOG = LogUtil.getLogger(getClass)
-  private val config = system.settings.config
-
-  private val kryoSerializer = new KryoSerializerWrapper(system)
-  private val kryo = kryoSerializer.kryo
-  val strategy = new DefaultInstantiatorStrategy
-  strategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy)
-  kryo.setInstantiatorStrategy(strategy)
-  private val kryoClazz = new GearpumpSerialization(config).customize(kryo)
-
-  override def serialize(message: Any): Array[Byte] = {
-    try {
-      kryoSerializer.toBinary(message)
-    } catch {
-      case ex: java.lang.IllegalArgumentException =>
-        val clazz = message.getClass
-        val error = s"""
-          | ${ex.getMessage}
-          |You can also register the class by providing a configuration with serializer
-          |defined,
-          |
-          |gearpump{
-          |  serializers {
-          |    ## Follow this format when adding new serializer for new message types
-          |    #    "yourpackage.YourClass" = "yourpackage.YourSerializerForThisClass"
-          |
-          |    ## If you intend to use default serializer for this class, then you can write this
-          |    #    "yourpackage.YourClass" = ""
-          |  }
-          |}
-          |
-          |If you want to register the serializer globally, you need to change
-          |gear.conf on every worker in the cluster; if you only want to register
-          |the serializer for a single streaming application, you need to create
-          |a file under conf/ named application.conf, and add the above configuration
-          |into application.conf. To verify whether the configuration is effective,
-          |you can browser your UI http://{UI Server Host}:8090/api/v1.0/app/{appId}/config,
-          |and check whether your custom serializer is added.
-        """.stripMargin
-
-        LOG.error(error, ex)
-        throw new KryoSerializationException(error, ex)
-    }
-  }
-
-  override def deserialize(msg: Array[Byte]): Any = {
-    kryoSerializer.fromBinary(msg)
-  }
-}
-
-object FastKryoSerializer {
-  class KryoSerializationException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala b/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
deleted file mode 100644
index a7eb6cf..0000000
--- a/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.serializer
-
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
-import io.gearpump.util.{Constants, LogUtil}
-
-class GearpumpSerialization(config: Config) {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  def customize(kryo: Kryo): Unit = {
-
-    val serializationMap = configToMap(config, Constants.GEARPUMP_SERIALIZERS)
-
-    serializationMap.foreach { kv =>
-      val (key, value) = kv
-      val keyClass = Class.forName(key)
-
-      if (value == null || value.isEmpty) {
-
-        // Use default serializer for this class type
-        kryo.register(keyClass)
-      } else {
-        val valueClass = Class.forName(value)
-        val register = kryo.register(keyClass,
-          valueClass.newInstance().asInstanceOf[KryoSerializer[_]])
-        LOG.debug(s"Registering ${keyClass}, id: ${register.getId}")
-      }
-    }
-    kryo.setReferences(false)
-
-    // Requires the user to register the class first before using
-    kryo.setRegistrationRequired(true)
-  }
-
-  private final def configToMap(config: Config, path: String) = {
-    import scala.collection.JavaConverters._
-    config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala b/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
deleted file mode 100644
index 4947dcc..0000000
--- a/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.serializer
-
-import akka.actor.ExtendedActorSystem
-
-import io.gearpump.cluster.UserConfig
-
-/**
- * User are allowed to use a customized serialization framework by extending this
- * interface.
- */
-trait SerializationFramework {
-  def init(system: ExtendedActorSystem, config: UserConfig)
-
-  /**
-   *
-   * Need to be thread safe
-   *
-   * Get a serializer to use.
-   * Note: this method can be called in a multi-thread environment. It's the
-   * responsibility of SerializationFramework Developer to assure this method
-   * is thread safe.
-   *
-   * To be thread-safe, one recommendation would be using a thread local pool
-   * to maintain reference to Serializer of same thread.
-   *
-   * @return
-   */
-  def get(): Serializer
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/Serializer.scala b/core/src/main/scala/io/gearpump/serializer/Serializer.scala
deleted file mode 100644
index ff8b147..0000000
--- a/core/src/main/scala/io/gearpump/serializer/Serializer.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.serializer
-
-/**
- * User defined message serializer
- */
-trait Serializer {
-  def serialize(message: Any): Array[Byte]
-
-  def deserialize(msg: Array[Byte]): Any
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/Express.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/Express.scala b/core/src/main/scala/io/gearpump/transport/Express.scala
deleted file mode 100644
index 101b841..0000000
--- a/core/src/main/scala/io/gearpump/transport/Express.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport
-
-import scala.collection.immutable.LongMap
-import scala.concurrent._
-
-import akka.actor._
-import akka.agent.Agent
-import org.slf4j.Logger
-
-import io.gearpump.transport.netty.Client.Close
-import io.gearpump.transport.netty.{Context, TaskMessage}
-import io.gearpump.util.LogUtil
-
-trait ActorLookupById {
-
-  /** Lookup actor ref for local task actor by providing a TaskId (TaskId.toLong) */
-  def lookupLocalActor(id: Long): Option[ActorRef]
-}
-
-/**
- * Custom networking layer.
- *
- * It will translate long sender/receiver address to shorter ones to reduce
- * the network overhead.
- */
-class Express(val system: ExtendedActorSystem) extends Extension with ActorLookupById {
-
-  import system.dispatcher
-
-  import io.gearpump.transport.Express._
-  val localActorMap = Agent(LongMap.empty[ActorRef])
-  val remoteAddressMap = Agent(Map.empty[Long, HostPort])
-
-  val remoteClientMap = Agent(Map.empty[HostPort, ActorRef])
-
-  val conf = system.settings.config
-
-  lazy val (context, serverPort, localHost) = init
-
-  lazy val init = {
-    LOG.info(s"Start Express init ...${system.name}")
-    val context = new Context(system, conf)
-    val serverPort = context.bind("netty-server", this)
-    val localHost = HostPort(system.provider.getDefaultAddress.host.get, serverPort)
-    LOG.info(s"binding to netty server $localHost")
-
-    system.registerOnTermination(new Runnable {
-      override def run(): Unit = context.close()
-    })
-    (context, serverPort, localHost)
-  }
-
-  def unregisterLocalActor(id: Long): Unit = {
-    localActorMap.sendOff(_ - id)
-  }
-
-  /** Start Netty client actors to connect to remote machines */
-  def startClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = {
-    val clientsToClose = remoteClientMap.get().filterKeys(!hostPorts.contains(_)).keySet
-    closeClients(clientsToClose)
-    hostPorts.toList.foldLeft(Future(Map.empty[HostPort, ActorRef])) { (future, hostPort) =>
-      remoteClientMap.alter { map =>
-        if (!map.contains(hostPort)) {
-          val actor = context.connect(hostPort)
-          map + (hostPort -> actor)
-        } else {
-          map
-        }
-      }
-    }
-  }
-
-  def closeClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = {
-    remoteClientMap.alter { map =>
-      map.filterKeys(hostPorts.contains).foreach { hostAndClient =>
-        val (_, client) = hostAndClient
-        client ! Close
-      }
-      map -- hostPorts
-    }
-  }
-
-  def registerLocalActor(id: Long, actor: ActorRef): Unit = {
-    LOG.info(s"RegisterLocalActor: $id, actor: ${actor.path.name}")
-    init
-    localActorMap.sendOff(_ + (id -> actor))
-  }
-
-  def lookupLocalActor(id: Long): Option[ActorRef] = localActorMap.get().get(id)
-
-  def lookupRemoteAddress(id: Long): Option[HostPort] = remoteAddressMap.get().get(id)
-
-  /** Send message to remote task */
-  def transport(taskMessage: TaskMessage, remote: HostPort): Unit = {
-
-    val remoteClient = remoteClientMap.get.get(remote)
-    if (remoteClient.isDefined) {
-      remoteClient.get.tell(taskMessage, Actor.noSender)
-    } else {
-      val errorMsg = s"Clients has not been launched properly before transporting messages, " +
-        s"the destination is $remote"
-      LOG.error(errorMsg)
-      throw new Exception(errorMsg)
-    }
-  }
-}
-
-/** A customized transport layer by using Akka extension */
-object Express extends ExtensionId[Express] with ExtensionIdProvider {
-  val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override def get(system: ActorSystem): Express = super.get(system)
-
-  override def lookup: ExtensionId[Express] = Express
-
-  override def createExtension(system: ExtendedActorSystem): Express = new Express(system)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/HostPort.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/HostPort.scala b/core/src/main/scala/io/gearpump/transport/HostPort.scala
deleted file mode 100644
index 40c4342..0000000
--- a/core/src/main/scala/io/gearpump/transport/HostPort.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport
-
-case class HostPort(host: String, port: Int) {
-  def toTuple: (String, Int) = {
-    (host, port)
-  }
-}
-
-object HostPort {
-  def apply(address: String): HostPort = {
-    val hostAndPort = address.split(":")
-    new HostPort(hostAndPort(0), hostAndPort(1).toInt)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/Client.scala b/core/src/main/scala/io/gearpump/transport/netty/Client.scala
deleted file mode 100644
index d5960ad..0000000
--- a/core/src/main/scala/io/gearpump/transport/netty/Client.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport.netty
-
-import java.net.{ConnectException, InetSocketAddress}
-import java.nio.channels.ClosedChannelException
-import java.util
-import java.util.Random
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
-import scala.language.implicitConversions
-
-import akka.actor.Actor
-import org.jboss.netty.bootstrap.ClientBootstrap
-import org.jboss.netty.channel._
-import org.slf4j.Logger
-
-import io.gearpump.transport.HostPort
-import io.gearpump.util.LogUtil
-
-/**
- * Netty Client implemented as an actor, on the other side, there is a netty server Actor.
- * All messages sent to this actor will be forwarded to remote machine.
- */
-class Client(conf: NettyConfig, factory: ChannelFactory, hostPort: HostPort) extends Actor {
-  import io.gearpump.transport.netty.Client._
-
-  val name = s"netty-client-$hostPort"
-
-  private final var bootstrap: ClientBootstrap = null
-  private final val random: Random = new Random
-  private val serializer = conf.newTransportSerializer
-  private var channel: Channel = null
-
-  var batch = new util.ArrayList[TaskMessage]
-
-  private val init = {
-    bootstrap = NettyUtil.createClientBootStrap(factory,
-      new ClientPipelineFactory(name, conf), conf.buffer_size)
-    self ! Connect(0)
-  }
-
-  def receive: Receive = messageHandler orElse connectionHandler
-
-  def messageHandler: Receive = {
-    case msg: TaskMessage =>
-      batch.add(msg)
-    case flush@Flush(flushChannel) =>
-      if (channel != flushChannel) {
-        Unit // Drop, as it belong to old channel flush message
-      } else if (batch.size > 0 && flushChannel.isWritable) {
-        send(flushChannel, batch.iterator)
-        batch.clear()
-        self ! flush
-      } else {
-        import context.dispatcher
-        context.system.scheduler.scheduleOnce(
-          new FiniteDuration(conf.flushCheckInterval, TimeUnit.MILLISECONDS), self, flush)
-      }
-  }
-
-  def connectionHandler: Receive = {
-    case ChannelReady(channel) =>
-      this.channel = channel
-      self ! Flush(channel)
-    case Connect(tries) =>
-      if (null == channel) {
-        connect(tries)
-      } else {
-        LOG.error("there already exist a channel, will not establish a new one...")
-      }
-    case CompareAndReconnectIfEqual(oldChannel) =>
-      if (oldChannel == channel) {
-        channel = null
-        self ! Connect(0)
-      }
-    case Close =>
-      close()
-      context.become(closed)
-  }
-
-  def closed: Receive = {
-    case msg: AnyRef =>
-      LOG.error(s"This client $name is closed, drop any message ${msg.getClass.getSimpleName}...")
-  }
-
-  private def connect(tries: Int): Unit = {
-    LOG.info(s"netty client try to connect to $name, tries: $tries")
-    if (tries <= conf.max_retries) {
-      val remote_addr = new InetSocketAddress(hostPort.host, hostPort.port)
-      val future = bootstrap.connect(remote_addr)
-      future success { current =>
-        LOG.info(s"netty client successfully connectted to $name, tries: $tries")
-        self ! ChannelReady(current)
-      } fail { (current, ex) =>
-        LOG.error(s"failed to connect to $name, reason: ${ex.getMessage}, class: ${ex.getClass}")
-        current.close()
-        import context.dispatcher
-        context.system.scheduler.scheduleOnce(
-          new FiniteDuration(
-            getSleepTimeMs(tries), TimeUnit.MILLISECONDS), self, Connect(tries + 1))
-      }
-    } else {
-      LOG.error(s"fail to connect to a remote host $name after retied $tries ...")
-      self ! Close
-    }
-  }
-
-  private def send(flushChannel: Channel, msgs: util.Iterator[TaskMessage]) {
-    var messageBatch: MessageBatch = null
-
-    while (msgs.hasNext) {
-      val message: TaskMessage = msgs.next()
-      if (null == messageBatch) {
-        messageBatch = new MessageBatch(conf.messageBatchSize, serializer)
-      }
-      messageBatch.add(message)
-      if (messageBatch.isFull) {
-        val toBeFlushed: MessageBatch = messageBatch
-        flushRequest(flushChannel, toBeFlushed)
-        messageBatch = null
-      }
-    }
-    if (null != messageBatch && !messageBatch.isEmpty) {
-      flushRequest(flushChannel, messageBatch)
-    }
-  }
-
-  private def close() {
-    LOG.info(s"closing netty client $name...")
-    if (null != channel) {
-      channel.close()
-      channel = null
-    }
-    batch = null
-  }
-
-  override def postStop(): Unit = {
-    close()
-  }
-
-  private def flushRequest(channel: Channel, requests: MessageBatch) {
-    val future: ChannelFuture = channel.write(requests)
-    future.fail { (channel, ex) =>
-      if (channel.isOpen) {
-        channel.close
-      }
-      LOG.error(s"failed to send requests " +
-        s"to ${channel.getRemoteAddress} ${ex.getClass.getSimpleName}")
-      if (!ex.isInstanceOf[ClosedChannelException]) {
-        LOG.error(ex.getMessage, ex)
-      }
-      self ! CompareAndReconnectIfEqual(channel)
-    }
-  }
-
-  private def getSleepTimeMs(retries: Int): Long = {
-    if (retries > 30) {
-      conf.max_sleep_ms
-    } else {
-      val backoff = 1 << retries
-      val sleepMs = conf.base_sleep_ms * Math.max(1, random.nextInt(backoff))
-      if (sleepMs < conf.max_sleep_ms) sleepMs else conf.max_sleep_ms
-    }
-  }
-
-  private def isChannelWritable = (null != channel) && channel.isWritable
-}
-
-object Client {
-  val LOG: Logger = LogUtil.getLogger(getClass)
-
-  // Reconnect if current channel equals channel
-  case class CompareAndReconnectIfEqual(channel: Channel)
-
-  case class Connect(tries: Int)
-  case class ChannelReady(chanel: Channel)
-  case object Close
-
-  case class Flush(channel: Channel)
-
-  class ClientErrorHandler(name: String) extends SimpleChannelUpstreamHandler {
-
-    override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) {
-      event.getCause match {
-        case ex: ConnectException => Unit
-        case ex: ClosedChannelException =>
-          LOG.warn("exception found when trying to close netty connection", ex.getMessage)
-        case ex => LOG.error("Connection failed " + name, ex)
-      }
-    }
-  }
-
-  class ClientPipelineFactory(name: String, conf: NettyConfig) extends ChannelPipelineFactory {
-    def getPipeline: ChannelPipeline = {
-      val pipeline: ChannelPipeline = Channels.pipeline
-      pipeline.addLast("decoder", new MessageDecoder(conf.newTransportSerializer))
-      pipeline.addLast("encoder", new MessageEncoder)
-      pipeline.addLast("handler", new ClientErrorHandler(name))
-      pipeline
-    }
-  }
-
-  implicit def channelFutureToChannelFutureOps(channel: ChannelFuture): ChannelFutureOps = {
-    new ChannelFutureOps(channel)
-  }
-
-  class ChannelFutureOps(channelFuture: ChannelFuture) {
-
-    def success(handler: (Channel => Unit)): ChannelFuture = {
-      channelFuture.addListener(new ChannelFutureListener {
-        def operationComplete(future: ChannelFuture) {
-          if (future.isSuccess) {
-            handler(future.getChannel)
-          }
-        }
-      })
-      channelFuture
-    }
-
-    def fail(handler: ((Channel, Throwable) => Unit)): ChannelFuture = {
-      channelFuture.addListener(new ChannelFutureListener {
-        def operationComplete(future: ChannelFuture) {
-          if (!future.isSuccess) {
-            handler(future.getChannel, future.getCause)
-          }
-        }
-      })
-      channelFuture
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/Context.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/Context.scala b/core/src/main/scala/io/gearpump/transport/netty/Context.scala
deleted file mode 100644
index 9a9ee29..0000000
--- a/core/src/main/scala/io/gearpump/transport/netty/Context.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport.netty
-
-import java.io.Closeable
-import java.util.concurrent._
-import scala.collection.JavaConverters._
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import com.typesafe.config.Config
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.slf4j.Logger
-
-import io.gearpump.transport.netty.Server.ServerPipelineFactory
-import io.gearpump.transport.{ActorLookupById, HostPort}
-import io.gearpump.util.{Constants, LogUtil}
-
-object Context {
-  private final val LOG: Logger = LogUtil.getLogger(getClass)
-}
-
-/** Netty Context */
-class Context(system: ActorSystem, conf: NettyConfig) extends IContext {
-  import io.gearpump.transport.netty.Context._
-
-  def this(system: ActorSystem, conf: Config) {
-    this(system, new NettyConfig(conf))
-  }
-
-  private val closeHandler = new ConcurrentLinkedQueue[Closeable]()
-  private val nettyDispatcher = system.settings.config.getString(Constants.NETTY_DISPATCHER)
-  val maxWorkers: Int = 1
-
-  private lazy val clientChannelFactory: NioClientSocketChannelFactory = {
-    val bossFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-boss")
-    val workerFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-worker")
-    val channelFactory =
-      new NioClientSocketChannelFactory(
-        Executors.newCachedThreadPool(bossFactory),
-        Executors.newCachedThreadPool(workerFactory), maxWorkers)
-
-    closeHandler.add(new Closeable {
-      override def close(): Unit = {
-        LOG.info("Closing all client resources....")
-        channelFactory.releaseExternalResources
-      }
-    })
-    channelFactory
-  }
-
-  def bind(
-      name: String, lookupActor : ActorLookupById, deserializeFlag : Boolean = true,
-      inputPort: Int = 0): Int = {
-    // TODO: whether we should expose it as application config?
-    val server = system.actorOf(Props(classOf[Server], name, conf, lookupActor,
-      deserializeFlag).withDispatcher(nettyDispatcher), name)
-    val (port, channel) = NettyUtil.newNettyServer(name,
-      new ServerPipelineFactory(server, conf), 5242880, inputPort)
-    val factory = channel.getFactory
-    closeHandler.add(new Closeable {
-      override def close(): Unit = {
-        system.stop(server)
-        channel.close()
-        LOG.info("Closing all server resources....")
-        factory.releaseExternalResources
-      }
-    })
-    port
-  }
-
-  def connect(hostPort: HostPort): ActorRef = {
-    val client = system.actorOf(Props(classOf[Client], conf, clientChannelFactory, hostPort)
-      .withDispatcher(nettyDispatcher))
-    closeHandler.add(new Closeable {
-      override def close(): Unit = {
-        LOG.info("closing Client actor....")
-        system.stop(client)
-      }
-    })
-
-    client
-  }
-
-  /**
-   * terminate this context
-   */
-  def close(): Unit = {
-
-    LOG.info(s"Context.term, cleanup resources...., " +
-      s"we have ${closeHandler.size()} items to close...")
-
-    // Cleans up resource in reverse order so that client actor can be cleaned
-    // before clientChannelFactory
-    closeHandler.iterator().asScala.toList.reverse.foreach(_.close())
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/IContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/IContext.scala b/core/src/main/scala/io/gearpump/transport/netty/IContext.scala
deleted file mode 100644
index 56b2f7c..0000000
--- a/core/src/main/scala/io/gearpump/transport/netty/IContext.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport.netty
-
-import akka.actor.ActorRef
-
-import io.gearpump.transport.{ActorLookupById, HostPort}
-
-trait IContext {
-
-  /**
-   * Create a Netty server connection.
-   */
-  def bind(name: String, lookupActor: ActorLookupById, deserializeFlag: Boolean, port: Int): Int
-
-  /**
-   * Create a Netty client actor
-   */
-  def connect(hostPort: HostPort): ActorRef
-
-  /**
-   * Close resource for this context
-   */
-  def close()
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala b/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala
deleted file mode 100644
index a62eff5..0000000
--- a/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport.netty
-
-import com.typesafe.config.Config
-
-import io.gearpump.util.Constants
-
-class NettyConfig(conf: Config) {
-
-  val buffer_size = conf.getInt(Constants.NETTY_BUFFER_SIZE)
-  val max_retries = conf.getInt(Constants.NETTY_MAX_RETRIES)
-  val base_sleep_ms = conf.getInt(Constants.NETTY_BASE_SLEEP_MS)
-  val max_sleep_ms = conf.getInt(Constants.NETTY_MAX_SLEEP_MS)
-  val messageBatchSize = conf.getInt(Constants.NETTY_MESSAGE_BATCH_SIZE)
-  val flushCheckInterval = conf.getInt(Constants.NETTY_FLUSH_CHECK_INTERVAL)
-
-  def newTransportSerializer: ITransportMessageSerializer = {
-    Class.forName(
-      conf.getString(Constants.GEARPUMP_TRANSPORT_SERIALIZER))
-      .newInstance().asInstanceOf[ITransportMessageSerializer]
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala b/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala
deleted file mode 100644
index 3e746af..0000000
--- a/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport.netty
-
-import java.net.InetSocketAddress
-import java.util.concurrent.{Executors, ThreadFactory}
-
-import org.jboss.netty.bootstrap.{ClientBootstrap, ServerBootstrap}
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
-import org.jboss.netty.channel.{Channel, ChannelFactory, ChannelPipelineFactory}
-
-object NettyUtil {
-
-  def newNettyServer(
-      name: String,
-      pipelineFactory: ChannelPipelineFactory,
-      buffer_size: Int,
-      inputPort: Int = 0): (Int, Channel) = {
-    val bossFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-boss")
-    val workerFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-worker")
-    val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
-      Executors.newCachedThreadPool(workerFactory), 1)
-
-    val bootstrap = createServerBootStrap(factory, pipelineFactory, buffer_size)
-    val channel: Channel = bootstrap.bind(new InetSocketAddress(inputPort))
-    val port = channel.getLocalAddress().asInstanceOf[InetSocketAddress].getPort()
-    (port, channel)
-  }
-
-  def createServerBootStrap(
-      factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int)
-    : ServerBootstrap = {
-    val bootstrap = new ServerBootstrap(factory)
-    bootstrap.setOption("child.tcpNoDelay", true)
-    bootstrap.setOption("child.receiveBufferSize", buffer_size)
-    bootstrap.setOption("child.keepAlive", true)
-    bootstrap.setPipelineFactory(pipelineFactory)
-    bootstrap
-  }
-
-  def createClientBootStrap(
-      factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int)
-    : ClientBootstrap = {
-    val bootstrap = new ClientBootstrap(factory)
-    bootstrap.setOption("tcpNoDelay", true)
-    bootstrap.setOption("sendBufferSize", buffer_size)
-    bootstrap.setOption("keepAlive", true)
-    bootstrap.setPipelineFactory(pipelineFactory)
-    bootstrap
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/Server.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/Server.scala b/core/src/main/scala/io/gearpump/transport/netty/Server.scala
deleted file mode 100644
index 9a9d79b..0000000
--- a/core/src/main/scala/io/gearpump/transport/netty/Server.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport.netty
-
-import java.util
-import scala.collection.JavaConverters._
-import scala.collection.immutable.IntMap
-import scala.concurrent.Future
-
-import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem}
-import org.jboss.netty.channel._
-import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup}
-import org.slf4j.Logger
-
-import io.gearpump.transport.ActorLookupById
-import io.gearpump.util.{AkkaHelper, LogUtil}
-
-/** Netty server actor, message received will be forward to the target on the address line. */
-class Server(
-    name: String, conf: NettyConfig, lookupActor: ActorLookupById, deserializeFlag: Boolean)
-  extends Actor {
-
-  private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = name)
-  import io.gearpump.transport.netty.Server._
-
-  val allChannels: ChannelGroup = new DefaultChannelGroup("gearpump-server")
-
-  val system = context.system.asInstanceOf[ExtendedActorSystem]
-
-  def receive: Receive = msgHandler orElse channelManager
-  // As we will only transfer TaskId on the wire,
-  // this object will translate taskId to or from ActorRef
-  private val taskIdActorRefTranslation = new TaskIdActorRefTranslation(context)
-
-  def channelManager: Receive = {
-    case AddChannel(channel) => allChannels.add(channel)
-    case CloseChannel(channel) =>
-      import context.dispatcher
-      Future {
-        channel.close.awaitUninterruptibly
-        allChannels.remove(channel)
-      }
-  }
-
-  def msgHandler: Receive = {
-    case MsgBatch(msgs) =>
-      msgs.asScala.groupBy(_.targetTask()).foreach { taskBatch =>
-        val (taskId, taskMessages) = taskBatch
-        val actor = lookupActor.lookupLocalActor(taskId)
-
-        if (actor.isEmpty) {
-          LOG.error(s"Cannot find actor for id: $taskId...")
-        } else taskMessages.foreach { taskMessage =>
-          actor.get.tell(taskMessage.message(),
-            taskIdActorRefTranslation.translateToActorRef(taskMessage.sessionId()))
-        }
-      }
-  }
-
-  override def postStop(): Unit = {
-    allChannels.close.awaitUninterruptibly
-  }
-}
-
-object Server {
-
-  class ServerPipelineFactory(server: ActorRef, conf: NettyConfig) extends ChannelPipelineFactory {
-    def getPipeline: ChannelPipeline = {
-      val pipeline: ChannelPipeline = Channels.pipeline
-      pipeline.addLast("decoder", new MessageDecoder(conf.newTransportSerializer))
-      pipeline.addLast("encoder", new MessageEncoder)
-      pipeline.addLast("handler", new ServerHandler(server))
-      pipeline
-    }
-  }
-
-  class ServerHandler(server: ActorRef) extends SimpleChannelUpstreamHandler {
-    private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = server.path.name)
-
-    override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
-      server ! AddChannel(e.getChannel)
-    }
-
-    override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
-      val msgs: util.List[TaskMessage] = e.getMessage.asInstanceOf[util.List[TaskMessage]]
-      if (msgs != null) {
-        server ! MsgBatch(msgs)
-      }
-    }
-
-    override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
-      LOG.error("server errors in handling the request", e.getCause)
-      server ! CloseChannel(e.getChannel)
-    }
-  }
-
-  class TaskIdActorRefTranslation(context: ActorContext) {
-    private var taskIdtoActorRef = IntMap.empty[ActorRef]
-
-    /** 1-1 mapping from session id to fake ActorRef */
-    def translateToActorRef(sessionId: Int): ActorRef = {
-      if (!taskIdtoActorRef.contains(sessionId)) {
-
-        // A fake ActorRef for performance optimization.
-        val actorRef = AkkaHelper.actorFor(context.system, s"/session#$sessionId")
-        taskIdtoActorRef += sessionId -> actorRef
-      }
-      taskIdtoActorRef.get(sessionId).get
-    }
-  }
-
-  case class AddChannel(channel: Channel)
-
-  case class CloseChannel(channel: Channel)
-
-  case class MsgBatch(messages: java.lang.Iterable[TaskMessage])
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala b/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala
deleted file mode 100644
index 25a34d9..0000000
--- a/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.util.concurrent.{TimeUnit, TimeoutException}
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success, Try}
-
-import akka.actor._
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.util.LogUtil.ProcessType
-
-/**
- * ActorSystemBooter start a new JVM process to boot an actor system.
- * All executors are started by ActorSystemBooter
- *
- * It send the system address to "report back actor"
- */
-class ActorSystemBooter(config: Config) {
-  import io.gearpump.util.ActorSystemBooter._
-
-  def boot(name: String, reportBackActor: String): ActorSystem = {
-    val system = ActorSystem(name, config)
-    // Daemon path: http://{system}@{ip}:{port}/daemon
-    system.actorOf(Props(classOf[Daemon], name, reportBackActor), "daemon")
-    system
-  }
-}
-
-object ActorSystemBooter {
-
-  def apply(config: Config): ActorSystemBooter = new ActorSystemBooter(config)
-
-  def main(args: Array[String]) {
-    val name = args(0)
-    val reportBack = args(1)
-    val config = ClusterConfig.default()
-
-    LogUtil.loadConfiguration(config, ProcessType.APPLICATION)
-
-    val debugPort = Option(System.getProperty(Constants.GEARPUMP_REMOTE_DEBUG_PORT))
-    debugPort.foreach { port =>
-      val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass)
-      LOG.info("==========================================")
-      LOG.info("Remote debug port: " + port)
-      LOG.info("==========================================")
-    }
-
-    val system = apply(config).boot(name, reportBack)
-
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      override def run(): Unit = {
-        val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass)
-        LOG.info("Maybe we have received a SIGINT signal from parent process, " +
-          "start to cleanup resources....")
-        system.terminate()
-      }
-    })
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  case class BindLifeCycle(actor: ActorRef)
-  case class CreateActor(prop: Props, name: String)
-  case class ActorCreated(actor: ActorRef, name: String)
-  case class CreateActorFailed(name: String, reason: Throwable)
-
-  case class RegisterActorSystem(systemPath: String)
-
-  /**
-   * This actor system will watch for parent,
-   * If parent dies, this will also die
-   */
-  case class ActorSystemRegistered(bindLifeWith: ActorRef)
-  case class RegisterActorSystemFailed(reason: Throwable)
-
-  object RegisterActorSystemTimeOut
-
-  class Daemon(val name: String, reportBack: String) extends Actor {
-    val LOG: Logger = LogUtil.getLogger(getClass, context = name)
-
-    val username = Option(System.getProperty(Constants.GEARPUMP_USERNAME)).getOrElse("not_defined")
-    LOG.info(s"RegisterActorSystem to ${reportBack}, current user: $username")
-
-    val reportBackActor = context.actorSelection(reportBack)
-    reportBackActor ! RegisterActorSystem(ActorUtil.getSystemAddress(context.system).toString)
-
-    implicit val executionContext = context.dispatcher
-    val timeout = context.system.scheduler.scheduleOnce(Duration(25, TimeUnit.SECONDS),
-      self, RegisterActorSystemFailed(new TimeoutException))
-
-    context.become(waitForRegisterResult)
-
-    def receive: Receive = null
-
-    def waitForRegisterResult: Receive = {
-      case ActorSystemRegistered(parent) =>
-        timeout.cancel()
-        context.watch(parent)
-        context.become(waitCommand)
-      case RegisterActorSystemFailed(ex) =>
-        LOG.error("RegisterActorSystemFailed", ex)
-        timeout.cancel()
-        context.stop(self)
-    }
-
-    def waitCommand: Receive = {
-      case BindLifeCycle(actor) =>
-        LOG.info(s"ActorSystem $name Binding life cycle with actor: $actor")
-        context.watch(actor)
-      case create@CreateActor(props: Props, name: String) =>
-        LOG.info(s"creating actor $name")
-        val actor = Try(context.actorOf(props, name))
-        actor match {
-          case Success(actor) =>
-            sender ! ActorCreated(actor, name)
-          case Failure(e) =>
-            sender ! CreateActorFailed(props.clazz.getName, e)
-        }
-      case PoisonPill =>
-        context.stop(self)
-      case Terminated(actor) =>
-        LOG.info(s"System $name Watched actor is terminated $actor")
-        context.stop(self)
-    }
-
-    override def postStop(): Unit = {
-      LOG.info(s"ActorSystem $name is shutting down...")
-      context.system.terminate()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ActorUtil.scala b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
deleted file mode 100644
index d5f48a7..0000000
--- a/core/src/main/scala/io/gearpump/util/ActorUtil.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import scala.concurrent.{ExecutionContext, Future}
-
-import akka.actor.Actor.Receive
-import akka.actor._
-import akka.pattern.ask
-import org.slf4j.Logger
-
-import io.gearpump.cluster.AppMasterToMaster.GetAllWorkers
-import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId}
-import io.gearpump.cluster.MasterToAppMaster.WorkerList
-import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ResolveWorkerIdResult}
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, StartExecutorSystems}
-import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.transport.HostPort
-
-object ActorUtil {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  def getSystemAddress(system: ActorSystem): Address = {
-    system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
-  }
-
-  def getFullPath(system: ActorSystem, path: ActorPath): String = {
-    path.toStringWithAddress(getSystemAddress(system))
-  }
-
-  def getHostname(actor: ActorRef): String = {
-    val path = actor.path
-    path.address.host.getOrElse("localhost")
-  }
-
-  def defaultMsgHandler(actor: ActorRef): Receive = {
-    case msg: Any =>
-      LOG.error(s"Cannot find a matching message, ${msg.getClass.toString}, forwarded from $actor")
-  }
-
-  def printActorSystemTree(system: ActorSystem): Unit = {
-    val extendedSystem = system.asInstanceOf[ExtendedActorSystem]
-    val clazz = system.getClass
-    val m = clazz.getDeclaredMethod("printTree")
-    m.setAccessible(true)
-    LOG.info(m.invoke(system).asInstanceOf[String])
-  }
-
-  /** Checks whether a actor is child actor by simply examining name */
-  // TODO: fix this, we should also check the path to root besides name
-  def isChildActorPath(parent: ActorRef, child: ActorRef): Boolean = {
-    if (null != child) {
-      parent.path.name == child.path.parent.name
-    } else {
-      false
-    }
-  }
-
-  def actorNameForExecutor(appId: Int, executorId: Int): String = "app" + appId + "-executor" +
-    executorId
-
-  // TODO: Currently we explicitly require the master contacts to be started with this path pattern
-  // akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER
-  def getMasterActorPath(master: HostPort): ActorPath = {
-    import io.gearpump.util.Constants.MASTER
-    ActorPath.fromString(s"akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER")
-  }
-
-  def launchExecutorOnEachWorker(master: ActorRef, executorJvmConfig: ExecutorSystemJvmConfig,
-      sender: ActorRef)(implicit executor: scala.concurrent.ExecutionContext): Unit = {
-    implicit val timeout = Constants.FUTURE_TIMEOUT
-
-    (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]].map { list =>
-      val resources = list.workers.map {
-        workerId => ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER)
-      }.toArray
-
-      master.tell(StartExecutorSystems(resources, executorJvmConfig), sender)
-    }
-  }
-
-  def askAppMaster[T](master: ActorRef, appId: Int, msg: Any)(implicit ex: ExecutionContext)
-    : Future[T] = {
-    implicit val timeout = Constants.FUTURE_TIMEOUT
-    val appmaster = askActor[ResolveAppIdResult](master, ResolveAppId(appId)).flatMap { result =>
-      if (result.appMaster.isSuccess) {
-        Future.successful(result.appMaster.get)
-      } else {
-        Future.failed(result.appMaster.failed.get)
-      }
-    }
-    appmaster.flatMap(askActor[T](_, msg))
-  }
-
-  def askWorker[T](master: ActorRef, workerId: WorkerId, msg: Any)(implicit ex: ExecutionContext)
-    : Future[T] = {
-    implicit val timeout = Constants.FUTURE_TIMEOUT
-    val worker = askActor[ResolveWorkerIdResult](master, ResolveWorkerId(workerId))
-      .flatMap { result =>
-        if (result.worker.isSuccess) {
-          Future.successful(result.worker.get)
-        } else {
-          Future.failed(result.worker.failed.get)
-        }
-      }
-    worker.flatMap(askActor[T](_, msg))
-  }
-
-  def askActor[T](actor: ActorRef, msg: Any)(implicit ex: ExecutionContext): Future[T] = {
-    implicit val timeout = Constants.FUTURE_TIMEOUT
-    (actor ? msg).asInstanceOf[Future[T]]
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/AkkaApp.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/AkkaApp.scala b/core/src/main/scala/io/gearpump/util/AkkaApp.scala
deleted file mode 100644
index 2b0bf61..0000000
--- a/core/src/main/scala/io/gearpump/util/AkkaApp.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import scala.util.Try
-
-import io.gearpump.cluster.ClusterConfig
-
-/**
- * A Main class helper to load Akka configuration automatically.
- */
-trait AkkaApp {
-
-  type Config = com.typesafe.config.Config
-
-  def main(akkaConf: Config, args: Array[String]): Unit
-
-  def help(): Unit
-
-  protected def akkaConfig: Config = {
-    ClusterConfig.default()
-  }
-
-  def main(args: Array[String]): Unit = {
-    Try {
-      main(akkaConfig, args)
-    }.failed.foreach { ex => help(); throw ex }
-  }
-}