You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ch...@apache.org on 2017/09/28 15:55:21 UTC

[40/57] [abbrv] incubator-predictionio git commit: [PIO-110] Refactoring

[PIO-110] Refactoring

Closes #425


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/6789dbeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/6789dbeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/6789dbeb

Branch: refs/heads/livedoc
Commit: 6789dbeb71b1cc7f13a385032da6fbc3b8cf7a12
Parents: 3856e5c
Author: Naoki Takezoe <ta...@apache.org>
Authored: Mon Aug 28 19:31:58 2017 +0900
Committer: Naoki Takezoe <ta...@apache.org>
Committed: Mon Aug 28 19:31:58 2017 +0900

----------------------------------------------------------------------
 .../authentication/KeyAuthentication.scala      |  4 +-
 .../apache/predictionio/controller/Engine.scala | 41 ++++++-----
 .../predictionio/controller/EngineParams.scala  |  4 +-
 .../controller/FastEvalEngine.scala             |  4 +-
 .../controller/IdentityPreparator.scala         |  2 -
 .../predictionio/controller/LAlgorithm.scala    | 15 ++--
 .../controller/MetricEvaluator.scala            | 10 +--
 .../predictionio/controller/P2LAlgorithm.scala  | 15 ++--
 .../predictionio/controller/PAlgorithm.scala    | 15 ++--
 .../core/SelfCleaningDataSource.scala           | 24 +++----
 .../predictionio/workflow/BatchPredict.scala    | 11 ++-
 .../predictionio/workflow/CreateServer.scala    | 24 +++----
 .../predictionio/workflow/CreateWorkflow.scala  |  8 +--
 .../workflow/EngineServerPluginContext.scala    |  4 +-
 .../predictionio/workflow/WorkflowContext.scala |  4 +-
 .../predictionio/workflow/WorkflowUtils.scala   | 17 ++---
 .../predictionio/workflow/index.scala.html      | 14 ++--
 .../predictionio/controller/EngineTest.scala    |  2 +-
 .../apache/predictionio/data/api/Common.scala   |  1 -
 .../predictionio/data/api/EventServer.scala     |  4 +-
 .../data/api/EventServerPluginContext.scala     |  4 +-
 .../apache/predictionio/data/api/Stats.scala    |  2 +-
 .../apache/predictionio/data/api/Webhooks.scala | 27 ++------
 .../data/storage/EngineInstances.scala          |  4 +-
 .../data/storage/EvaluationInstances.scala      |  4 +-
 .../data/storage/EventJson4sSupport.scala       |  3 +-
 .../data/storage/PEventAggregator.scala         |  2 -
 .../predictionio/data/storage/Storage.scala     |  4 +-
 .../predictionio/data/view/LBatchView.scala     |  1 -
 .../predictionio/data/view/QuickTest.scala      |  4 --
 .../e2/engine/BinaryVectorizer.scala            |  1 -
 .../e2/engine/CategoricalNaiveBayes.scala       |  1 -
 .../predictionio/e2/engine/MarkovChain.scala    |  1 -
 .../data/storage/elasticsearch/ESApps.scala     |  2 +-
 .../storage/elasticsearch/ESEventsUtil.scala    |  2 +-
 .../data/storage/elasticsearch/ESLEvents.scala  | 30 +-------
 .../data/storage/elasticsearch/ESPEvents.scala  |  3 +-
 .../data/storage/elasticsearch/ESUtils.scala    | 15 ++--
 .../storage/elasticsearch/ESAccessKeys.scala    |  2 +-
 .../data/storage/elasticsearch/ESApps.scala     |  2 +-
 .../elasticsearch/ESEngineInstances.scala       |  6 +-
 .../elasticsearch/ESEvaluationInstances.scala   |  4 +-
 .../data/storage/hbase/HBEventsUtil.scala       | 41 +++++------
 .../data/storage/hbase/upgrade/HB_0_8_0.scala   |  2 +-
 .../predictionio/tools/RunBatchPredict.scala    |  4 +-
 .../apache/predictionio/tools/RunWorkflow.scala | 20 +++---
 .../org/apache/predictionio/tools/Runner.scala  |  2 +-
 .../tools/admin/CommandClient.scala             |  4 +-
 .../predictionio/tools/commands/App.scala       | 72 ++++++++++----------
 .../tools/commands/Management.scala             |  2 +-
 .../predictionio/tools/console/Console.scala    |  8 +--
 51 files changed, 207 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala
----------------------------------------------------------------------
diff --git a/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala b/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala
index 3ebc0b4..fa950aa 100644
--- a/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala
+++ b/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala
@@ -49,12 +49,12 @@ trait KeyAuthentication {
 
         val passedKey = accessKeyParamOpt.getOrElse {
           Left(AuthenticationFailedRejection(
-            AuthenticationFailedRejection.CredentialsRejected, List()))
+            AuthenticationFailedRejection.CredentialsRejected, Nil))
         }
 
         if (!ServerKey.authEnforced || passedKey.equals(ServerKey.get)) Right(ctx.request)
         else Left(AuthenticationFailedRejection(
-          AuthenticationFailedRejection.CredentialsRejected, List()))
+          AuthenticationFailedRejection.CredentialsRejected, Nil))
 
       }
   }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/Engine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/Engine.scala b/core/src/main/scala/org/apache/predictionio/controller/Engine.scala
index 436c542..1f9d0ab 100644
--- a/core/src/main/scala/org/apache/predictionio/controller/Engine.scala
+++ b/core/src/main/scala/org/apache/predictionio/controller/Engine.scala
@@ -38,7 +38,6 @@ import org.apache.predictionio.workflow.StopAfterReadInterruption
 import org.apache.predictionio.workflow.WorkflowParams
 import org.apache.predictionio.workflow.WorkflowUtils
 import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.RDD
 import org.json4s._
 import org.json4s.native.JsonMethods._
@@ -255,13 +254,12 @@ class Engine[TD, EI, PD, Q, P, A](
                 s"Loaded model ${m.getClass.getName} for algorithm " +
                 s"${algo.getClass.getName}")
               sc.stop
-              m
             } catch {
               case e: NullPointerException =>
                 logger.warn(
                   s"Null model detected for algorithm ${algo.getClass.getName}")
-                m
             }
+            m
           }
         }  // model match
       }
@@ -692,7 +690,7 @@ object Engine {
     val models: Seq[Any] = algorithmList.map(_.trainBase(sc, pd))
 
     if (!params.skipSanityCheck) {
-      models.foreach { model => {
+      models.foreach { model =>
         model match {
           case sanityCheckable: SanityCheck => {
             logger.info(s"${model.getClass.getName} supports data sanity" +
@@ -704,7 +702,7 @@ object Engine {
               " data sanity check. Skipping check.")
           }
         }
-      }}
+      }
     }
 
     logger.info("EngineWorkflow.train completed")
@@ -758,49 +756,49 @@ object Engine {
       .mapValues(_._3)
       .mapValues{ _.zipWithUniqueId().map(_.swap) }
 
-    val preparedMap: Map[EX, PD] = evalTrainMap.mapValues { td => {
+    val preparedMap: Map[EX, PD] = evalTrainMap.mapValues { td =>
       preparator.prepareBase(sc, td)
-    }}
+    }
 
-    val algoModelsMap: Map[EX, Map[AX, Any]] = preparedMap.mapValues { pd => {
+    val algoModelsMap: Map[EX, Map[AX, Any]] = preparedMap.mapValues { pd =>
       algoMap.mapValues(_.trainBase(sc,pd))
-    }}
+    }
 
     val suppQAsMap: Map[EX, RDD[(QX, (Q, A))]] = evalQAsMap.mapValues { qas =>
       qas.map { case (qx, (q, a)) => (qx, (serving.supplementBase(q), a)) }
     }
 
     val algoPredictsMap: Map[EX, RDD[(QX, Seq[P])]] = (0 until evalCount)
-    .map { ex => {
+    .map { ex =>
       val modelMap: Map[AX, Any] = algoModelsMap(ex)
 
       val qs: RDD[(QX, Q)] = suppQAsMap(ex).mapValues(_._1)
 
       val algoPredicts: Seq[RDD[(QX, (AX, P))]] = (0 until algoCount)
-      .map { ax => {
+      .map { ax =>
         val algo = algoMap(ax)
         val model = modelMap(ax)
         val rawPredicts: RDD[(QX, P)] = algo.batchPredictBase(sc, model, qs)
-        val predicts: RDD[(QX, (AX, P))] = rawPredicts.map { case (qx, p) => {
+        val predicts: RDD[(QX, (AX, P))] = rawPredicts.map { case (qx, p) =>
           (qx, (ax, p))
-        }}
+        }
         predicts
-      }}
+      }
 
       val unionAlgoPredicts: RDD[(QX, Seq[P])] = sc.union(algoPredicts)
       .groupByKey()
-      .mapValues { ps => {
+      .mapValues { ps =>
         assert (ps.size == algoCount, "Must have same length as algoCount")
         // TODO. Check size == algoCount
         ps.toSeq.sortBy(_._1).map(_._2)
-      }}
+      }
 
       (ex, unionAlgoPredicts)
-    }}
+    }
     .toMap
 
     val servingQPAMap: Map[EX, RDD[(Q, P, A)]] = algoPredictsMap
-    .map { case (ex, psMap) => {
+    .map { case (ex, psMap) =>
       // The query passed to serving.serve is the original one, not
       // supplemented.
       val qasMap: RDD[(QX, (Q, A))] = evalQAsMap(ex)
@@ -811,12 +809,11 @@ object Engine {
         case (qx, q, ps, a) => (q, serving.serveBase(q, ps), a)
       }
       (ex, qpaMap)
-    }}
+    }
 
-    (0 until evalCount).map { ex => {
+    (0 until evalCount).map { ex =>
       (evalInfoMap(ex), servingQPAMap(ex))
-    }}
-    .toSeq
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala b/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala
index 6dccd4a..8068eaa 100644
--- a/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala
+++ b/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala
@@ -35,7 +35,7 @@ import scala.language.implicitConversions
 class EngineParams(
     val dataSourceParams: (String, Params) = ("", EmptyParams()),
     val preparatorParams: (String, Params) = ("", EmptyParams()),
-    val algorithmParamsList: Seq[(String, Params)] = Seq(),
+    val algorithmParamsList: Seq[(String, Params)] = Nil,
     val servingParams: (String, Params) = ("", EmptyParams()))
   extends Serializable {
 
@@ -102,7 +102,7 @@ object EngineParams {
     dataSourceParams: Params = EmptyParams(),
     preparatorName: String = "",
     preparatorParams: Params = EmptyParams(),
-    algorithmParamsList: Seq[(String, Params)] = Seq(),
+    algorithmParamsList: Seq[(String, Params)] = Nil,
     servingName: String = "",
     servingParams: Params = EmptyParams()): EngineParams = {
       new EngineParams(

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala b/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala
index e046b62..d128776 100644
--- a/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala
+++ b/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala
@@ -270,10 +270,10 @@ object FastEvalEngineWorkflow  {
     workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
     engineParamsList: Seq[EngineParams])
   : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = {
-    engineParamsList.map { engineParams => {
+    engineParamsList.map { engineParams =>
       (engineParams,
         getServingResult(workflow, new ServingPrefix(engineParams)))
-    }}
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala b/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala
index a82f493..8256142 100644
--- a/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala
+++ b/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala
@@ -22,8 +22,6 @@ import org.apache.predictionio.core.BaseDataSource
 import org.apache.predictionio.core.BasePreparator
 import org.apache.spark.SparkContext
 
-import scala.reflect._
-
 /** A helper concrete implementation of [[org.apache.predictionio.core.BasePreparator]]
   * that passes training data through without any special preparation. This can
   * be used in place for both [[PPreparator]] and [[LPreparator]].

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala
index 9e973e4..27d1d14 100644
--- a/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala
+++ b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala
@@ -119,15 +119,12 @@ abstract class LAlgorithm[PD, M : ClassTag, Q, P]
     bm: Any): Any = {
     // Check RDD[M].count == 1
     val m = bm.asInstanceOf[RDD[M]].first()
-    if (m.isInstanceOf[PersistentModel[_]]) {
-      if (m.asInstanceOf[PersistentModel[Params]].save(
-        modelId, algoParams, sc)) {
-        PersistentModelManifest(className = m.getClass.getName)
-      } else {
-        ()
-      }
-    } else {
-      m
+    m match {
+      case m: PersistentModel[Params] @unchecked =>
+        if(m.save(modelId, algoParams, sc)){
+          PersistentModelManifest(className = m.getClass.getName)
+        } else ()
+      case _ => m
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala b/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala
index 73ecbe4..fc5ec15 100644
--- a/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala
+++ b/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala
@@ -222,9 +222,8 @@ class MetricEvaluator[EI, Q, P, A, R] (
     params: WorkflowParams): MetricEvaluatorResult[R] = {
 
     val evalResultList: Seq[(EngineParams, MetricScores[R])] = engineEvalDataSet
-    .zipWithIndex
     .par
-    .map { case ((engineParams, evalDataSet), idx) =>
+    .map { case (engineParams, evalDataSet) =>
       val metricScores = MetricScores[R](
         metric.calculate(sc, evalDataSet),
         otherMetrics.map(_.calculate(sc, evalDataSet)))
@@ -235,15 +234,16 @@ class MetricEvaluator[EI, Q, P, A, R] (
     implicit lazy val formats = Utils.json4sDefaultFormats +
       new NameParamsSerializer
 
-    evalResultList.zipWithIndex.foreach { case ((ep, r), idx) =>
+    val evalResultListWithIndex = evalResultList.zipWithIndex
+
+    evalResultListWithIndex.foreach { case ((ep, r), idx) =>
       logger.info(s"Iteration $idx")
       logger.info(s"EngineParams: ${JsonExtractor.engineParamsToJson(Both, ep)}")
       logger.info(s"Result: $r")
     }
 
     // use max. take implicit from Metric.
-    val ((bestEngineParams, bestScore), bestIdx) = evalResultList
-    .zipWithIndex
+    val ((bestEngineParams, bestScore), bestIdx) = evalResultListWithIndex
     .reduce { (x, y) =>
       if (metric.compare(x._1._2.score, y._1._2.score) >= 0) x else y
     }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala
index ede8dc2..c617d2c 100644
--- a/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala
+++ b/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala
@@ -110,15 +110,12 @@ abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P]
     algoParams: Params,
     bm: Any): Any = {
     val m = bm.asInstanceOf[M]
-    if (m.isInstanceOf[PersistentModel[_]]) {
-      if (m.asInstanceOf[PersistentModel[Params]].save(
-        modelId, algoParams, sc)) {
-        PersistentModelManifest(className = m.getClass.getName)
-      } else {
-        ()
-      }
-    } else {
-      m
+    m match {
+      case m: PersistentModel[Params] @unchecked =>
+        if(m.save(modelId, algoParams, sc)){
+          PersistentModelManifest(className = m.getClass.getName)
+        } else ()
+      case _ => m
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala
index 3419de3..55f8363 100644
--- a/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala
+++ b/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala
@@ -115,15 +115,12 @@ abstract class PAlgorithm[PD, M, Q, P]
     algoParams: Params,
     bm: Any): Any = {
     val m = bm.asInstanceOf[M]
-    if (m.isInstanceOf[PersistentModel[_]]) {
-      if (m.asInstanceOf[PersistentModel[Params]].save(
-        modelId, algoParams, sc)) {
-        PersistentModelManifest(className = m.getClass.getName)
-      } else {
-        ()
-      }
-    } else {
-      ()
+    m match {
+      case m: PersistentModel[Params] @unchecked =>
+        if(m.save(modelId, algoParams, sc)){
+          PersistentModelManifest(className = m.getClass.getName)
+        } else ()
+      case _ => ()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
index 3520d80..cadf6b8 100644
--- a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
+++ b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
@@ -93,7 +93,6 @@ trait SelfCleaningDataSource {
     */
   @DeveloperApi
   def getCleanedLEvents(lEvents: Iterable[Event]): Iterable[Event] = {
-
     eventWindow
       .flatMap(_.duration)
       .map { duration =>
@@ -101,7 +100,7 @@ trait SelfCleaningDataSource {
         lEvents.filter(e =>
           e.eventTime.isAfter(DateTime.now().minus(fd.toMillis)) || isSetEvent(e)
         )
-      }.getOrElse(lEvents).toIterable
+      }.getOrElse(lEvents)
   }
 
   def compressPProperties(sc: SparkContext, rdd: RDD[Event]): RDD[Event] = {
@@ -117,7 +116,7 @@ trait SelfCleaningDataSource {
   }
 
   def compressLProperties(events: Iterable[Event]): Iterable[Event] = {
-    events.filter(isSetEvent).toIterable
+    events.filter(isSetEvent)
       .groupBy(_.entityType)
       .map { pair =>
         val (_, ls) = pair
@@ -164,7 +163,7 @@ trait SelfCleaningDataSource {
         val result = cleanPEvents(sc)
         val originalEvents = PEventStore.find(appName)(sc)
         val newEvents = result subtract originalEvents
-        val eventsToRemove = (originalEvents subtract result).map { case e =>
+        val eventsToRemove = (originalEvents subtract result).map { e =>
           e.eventId.getOrElse("")
         }
 
@@ -187,7 +186,7 @@ trait SelfCleaningDataSource {
 
   def removeEvents(eventsToRemove: Set[String], appId: Int) {
     val listOfFuture: List[Future[Boolean]] = eventsToRemove
-      .filter(x =>  x != "").toList.map { case eventId =>
+      .filter(x =>  x != "").toList.map { eventId =>
       lEventsDb.futureDelete(eventId, appId)
     }
 
@@ -202,9 +201,8 @@ trait SelfCleaningDataSource {
 
   /** Replace events in Event Store
     *
-    * @param events new events
-    * @param appId delete all events of appId
-    * @param channelId delete all events of channelId
+    * @param newEvents new events
+    * @param eventsToRemove event ids to remove
     */
   def wipe(
     newEvents: Set[Event],
@@ -212,7 +210,7 @@ trait SelfCleaningDataSource {
   ): Unit = {
     val (appId, channelId) = Common.appNameToId(appName, None)
 
-    val listOfFutureNewEvents: List[Future[String]] = newEvents.toList.map { case event =>
+    val listOfFutureNewEvents: List[Future[String]] = newEvents.toList.map { event =>
       lEventsDb.futureInsert(recreateEvent(event, None, event.eventTime), appId)
     }
 
@@ -233,10 +231,10 @@ trait SelfCleaningDataSource {
 
     val rdd = eventWindow match {
       case Some(ew) =>
-        var updated =
+        val updated =
           if (ew.compressProperties) compressPProperties(sc, pEvents) else pEvents
 
-        val deduped = if (ew.removeDuplicates) removePDuplicates(sc,updated) else updated
+        val deduped = if (ew.removeDuplicates) removePDuplicates(sc, updated) else updated
         deduped
       case None =>
         pEvents
@@ -258,7 +256,7 @@ trait SelfCleaningDataSource {
         val result = cleanLEvents().toSet
         val originalEvents = LEventStore.find(appName).toSet
         val newEvents = result -- originalEvents
-        val eventsToRemove = (originalEvents -- result).map { case e =>
+        val eventsToRemove = (originalEvents -- result).map { e =>
           e.eventId.getOrElse("")
         }
 
@@ -278,7 +276,7 @@ trait SelfCleaningDataSource {
 
     val events = eventWindow match {
       case Some(ew) =>
-        var updated =
+        val updated =
           if (ew.compressProperties) compressLProperties(lEvents) else lEvents
         val deduped = if (ew.removeDuplicates) removeLDuplicates(updated) else updated
         deduped

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
index 2fb0545..5420638 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
@@ -134,12 +134,11 @@ object BatchPredict extends Logging {
     val maybeEngine = engineFactory()
 
     // EngineFactory return a base engine, which may not be deployable.
-    if (!maybeEngine.isInstanceOf[Engine[_,_,_,_,_,_]]) {
-      throw new NoSuchMethodException(
+    maybeEngine match {
+      case e: Engine[_, _, _, _, _, _] => e
+      case _ => throw new NoSuchMethodException(
         s"Engine $maybeEngine cannot be used for batch predict")
     }
-
-    maybeEngine.asInstanceOf[Engine[_,_,_,_,_,_]]
   }
 
   def run[Q, P](
@@ -207,8 +206,8 @@ object BatchPredict extends Logging {
       // finally Serving.serve.
       val supplementedQuery = serving.supplementBase(query)
       // TODO: Parallelize the following.
-      val predictions = algorithms.zipWithIndex.map { case (a, ai) =>
-        a.predictBase(models(ai), supplementedQuery)
+      val predictions = algorithms.zip(models).map { case (a, m) =>
+        a.predictBase(m, supplementedQuery)
       }
       // Notice that it is by design to call Serving.serve with the
       // *original* query.

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
index 8f0aed7..2447682 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala
@@ -18,7 +18,7 @@
 
 package org.apache.predictionio.workflow
 
-import java.io.{PrintWriter, Serializable, StringWriter}
+import java.io.Serializable
 import java.util.concurrent.TimeUnit
 
 import akka.actor._
@@ -32,6 +32,7 @@ import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator}
 import com.typesafe.config.ConfigFactory
 import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer
 import grizzled.slf4j.Logging
+import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.predictionio.authentication.KeyAuthentication
 import org.apache.predictionio.configuration.SSLConfiguration
 import org.apache.predictionio.controller.{Engine, Params, Utils, WithPrId}
@@ -309,7 +310,7 @@ class MasterActor (
       sprayHttpListener.map { l =>
         log.info("Server is shutting down.")
         l ! Http.Unbind(5.seconds)
-        system.shutdown
+        system.shutdown()
       } getOrElse {
         log.warning("No active server is running.")
       }
@@ -353,7 +354,7 @@ class MasterActor (
         }
       } else {
         log.error("Bind failed. Shutting down.")
-        system.shutdown
+        system.shutdown()
       }
   }
 
@@ -432,13 +433,6 @@ class ServerActor[Q, P](
     }
   }
 
-  def getStackTraceString(e: Throwable): String = {
-    val writer = new StringWriter()
-    val printWriter = new PrintWriter(writer)
-    e.printStackTrace(printWriter)
-    writer.toString
-  }
-
   val myRoute =
     path("") {
       get {
@@ -492,8 +486,8 @@ class ServerActor[Q, P](
               // finally Serving.serve.
               val supplementedQuery = serving.supplementBase(query)
               // TODO: Parallelize the following.
-              val predictions = algorithms.zipWithIndex.map { case (a, ai) =>
-                a.predictBase(models(ai), supplementedQuery)
+              val predictions = algorithms.zip(models).map { case (a, m) =>
+                a.predictBase(m, supplementedQuery)
               }
               // Notice that it is by design to call Serving.serve with the
               // *original* query.
@@ -533,7 +527,7 @@ class ServerActor[Q, P](
                     case id: WithPrId =>
                       Map("prId" -> id.prId)
                     case _ =>
-                      Map()
+                      Map.empty
                   }
                 val data = Map(
                   // "appId" -> dataSourceParams.asInstanceOf[ParamsWithAppId].appId,
@@ -596,7 +590,7 @@ class ServerActor[Q, P](
             } catch {
               case e: MappingException =>
                 val msg = s"Query:\n$queryString\n\nStack Trace:\n" +
-                  s"${getStackTraceString(e)}\n\n"
+                  s"${ExceptionUtils.getStackTrace(e)}\n\n"
                 log.error(msg)
                 args.logUrl map { url =>
                   remoteLog(
@@ -607,7 +601,7 @@ class ServerActor[Q, P](
                 complete(StatusCodes.BadRequest, e.getMessage)
               case e: Throwable =>
                 val msg = s"Query:\n$queryString\n\nStack Trace:\n" +
-                  s"${getStackTraceString(e)}\n\n"
+                  s"${ExceptionUtils.getStackTrace(e)}\n\n"
                 log.error(msg)
                 args.logUrl map { url =>
                   remoteLog(

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
index 899ace2..303ed06 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
@@ -165,14 +165,14 @@ object CreateWorkflow extends Logging {
       }
     }
 
-    val pioEnvVars = wfc.env.map(e =>
-      e.split(',').flatMap(p =>
+    val pioEnvVars = wfc.env.map { e =>
+      e.split(',').flatMap { p =>
         p.split('=') match {
           case Array(k, v) => List(k -> v)
           case _ => Nil
         }
-      ).toMap
-    ).getOrElse(Map())
+      }.toMap
+    }.getOrElse(Map.empty)
 
     if (evaluation.isEmpty) {
       val variantJson = parse(stringFromFile(wfc.engineVariant))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala b/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala
index 78d86ac..cfc83eb 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala
@@ -41,9 +41,9 @@ class EngineServerPluginContext(
     val pluginParams: mutable.Map[String, JValue],
     val log: LoggingAdapter) {
   def outputBlockers: Map[String, EngineServerPlugin] =
-    plugins.getOrElse(EngineServerPlugin.outputBlocker, Map()).toMap
+    plugins.getOrElse(EngineServerPlugin.outputBlocker, Map.empty).toMap
   def outputSniffers: Map[String, EngineServerPlugin] =
-    plugins.getOrElse(EngineServerPlugin.outputSniffer, Map()).toMap
+    plugins.getOrElse(EngineServerPlugin.outputSniffer, Map.empty).toMap
 }
 
 object EngineServerPluginContext extends Logging {

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/workflow/WorkflowContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/WorkflowContext.scala b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowContext.scala
index 7bd9117..c1e6937 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/WorkflowContext.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowContext.scala
@@ -28,8 +28,8 @@ import scala.language.existentials
 object WorkflowContext extends Logging {
   def apply(
       batch: String = "",
-      executorEnv: Map[String, String] = Map(),
-      sparkEnv: Map[String, String] = Map(),
+      executorEnv: Map[String, String] = Map.empty,
+      sparkEnv: Map[String, String] = Map.empty,
       mode: String = ""
     ): SparkContext = {
     val conf = new SparkConf()

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala
index 0e578be..9a75415 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala
@@ -60,12 +60,11 @@ object WorkflowUtils extends Logging {
         engineObject.instance.asInstanceOf[EngineFactory]
       )
     } catch {
-      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
+      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) =>
         (
           EngineLanguage.Java,
           Class.forName(engine).newInstance.asInstanceOf[EngineFactory]
         )
-      }
     }
   }
 
@@ -80,12 +79,11 @@ object WorkflowUtils extends Logging {
         epgObject.instance.asInstanceOf[EngineParamsGenerator]
       )
     } catch {
-      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
+      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) =>
         (
           EngineLanguage.Java,
           Class.forName(epg).newInstance.asInstanceOf[EngineParamsGenerator]
         )
-      }
     }
   }
 
@@ -99,12 +97,11 @@ object WorkflowUtils extends Logging {
         evaluationObject.instance.asInstanceOf[Evaluation]
       )
     } catch {
-      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
+      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) =>
         (
           EngineLanguage.Java,
           Class.forName(evaluation).newInstance.asInstanceOf[Evaluation]
         )
-      }
     }
   }
 
@@ -265,9 +262,9 @@ object WorkflowUtils extends Logging {
           Seq(file.toURI)
         } else {
           warn(s"Environment variable $p is pointing to a nonexistent file $f. Ignoring.")
-          Seq.empty[URI]
+          Seq.empty
         }
-      } getOrElse Seq.empty[URI]
+      } getOrElse Seq.empty
     )
   }
 
@@ -325,8 +322,8 @@ object WorkflowUtils extends Logging {
           error("Arrays are not allowed in the sparkConf section of engine.js.")
           sys.exit(1)
         }
-        case JNothing => List()
-        case _ => List(List() -> jv.values.toString)
+        case JNothing => Nil
+        case _ => List(Nil -> jv.values.toString)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
----------------------------------------------------------------------
diff --git a/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html b/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
index 62fe5a7..5040796 100644
--- a/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
+++ b/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
@@ -75,13 +75,13 @@
       <h2>Algorithms and Models</h2>
         <table class="table table-bordered table-striped">
           <tr><th>#</th><th colspan="2">Information</th></tr>
-          @for(a <- algorithms.zip(algorithmsParams).zip(models).zipWithIndex) {
-          <tr>
-            <th rowspan="3">@{a._2 + 1}</th>
-            <th>Class</th><td>@{a._1._1._1}</td>
-          </tr>
-          <tr><th>Parameters</th><td>@{a._1._1._2}</td></tr>
-          <tr><th>Model</th><td>@{a._1._2}</td></tr>
+          @for((((algo, param), model), i) <- algorithms.zip(algorithmsParams).zip(models).zipWithIndex) {
+            <tr>
+              <th rowspan="3">@{i + 1}</th>
+              <th>Class</th><td>@{algo}</td>
+            </tr>
+            <tr><th>Parameters</th><td>@{param}</td></tr>
+            <tr><th>Model</th><td>@{model}</td></tr>
           }
         </table>
       <h2>Serving</h2>

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala b/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala
index 94879a5..fb10c94 100644
--- a/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala
+++ b/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala
@@ -320,7 +320,7 @@ class EngineTrainSuite extends FunSuite with SharedSparkContext {
       sc,
       new PDataSource0(0),
       new PPreparator0(1),
-      Seq(),
+      Nil,
       defaultWorkflowParams
     )
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Common.scala b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
index a579add..60efea2 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
@@ -25,7 +25,6 @@ import spray.routing._
 import spray.routing.Directives._
 import spray.routing.Rejection
 import spray.http.StatusCodes
-import spray.http.StatusCode
 import spray.httpx.Json4sSupport
 
 import org.json4s.Formats

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
index 75c2227..41dfefb 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
@@ -131,13 +131,13 @@ class  EventServiceActor(
 
   private val FailedAuth = Left(
     AuthenticationFailedRejection(
-      AuthenticationFailedRejection.CredentialsRejected, List()
+      AuthenticationFailedRejection.CredentialsRejected, Nil
     )
   )
 
   private val MissedAuth = Left(
     AuthenticationFailedRejection(
-      AuthenticationFailedRejection.CredentialsMissing, List()
+      AuthenticationFailedRejection.CredentialsMissing, Nil
     )
   )
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala
index 36f3f73..cd14cc4 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala
@@ -30,10 +30,10 @@ class EventServerPluginContext(
     val plugins: mutable.Map[String, mutable.Map[String, EventServerPlugin]],
     val log: LoggingAdapter) {
   def inputBlockers: Map[String, EventServerPlugin] =
-    plugins.getOrElse(EventServerPlugin.inputBlocker, Map()).toMap
+    plugins.getOrElse(EventServerPlugin.inputBlocker, Map.empty).toMap
 
   def inputSniffers: Map[String, EventServerPlugin] =
-    plugins.getOrElse(EventServerPlugin.inputSniffer, Map()).toMap
+    plugins.getOrElse(EventServerPlugin.inputSniffer, Map.empty).toMap
 }
 
 object EventServerPluginContext extends Logging {

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
index ba2b575..9bbbc2e 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala
@@ -67,7 +67,7 @@ class Stats(val startTime: DateTime) {
     m
     .toSeq
     .flatMap { case (k, v) =>
-      if (k._1 == appId) { Seq(KV(k._2, v)) } else { Seq() }
+      if (k._1 == appId) { Seq(KV(k._2, v)) } else { Nil }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
index 87a4600..57be037 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala
@@ -18,22 +18,13 @@
 
 package org.apache.predictionio.data.api
 
-import org.apache.predictionio.data.webhooks.JsonConnector
-import org.apache.predictionio.data.webhooks.FormConnector
 import org.apache.predictionio.data.webhooks.ConnectorUtil
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.EventJson4sSupport
 import org.apache.predictionio.data.storage.LEvents
 
-import spray.routing._
-import spray.routing.Directives._
 import spray.http.StatusCodes
 import spray.http.StatusCode
 import spray.http.FormData
-import spray.httpx.Json4sSupport
 
-import org.json4s.Formats
-import org.json4s.DefaultFormats
 import org.json4s.JObject
 
 import akka.event.LoggingAdapter
@@ -61,14 +52,13 @@ private[predictionio] object Webhooks {
       }
     }
 
-    eventFuture.flatMap { eventOpt =>
-      if (eventOpt.isEmpty) {
+    eventFuture.flatMap {
+      case None =>
         Future successful {
           val message = s"webhooks connection for ${web} is not supported."
           (StatusCodes.NotFound, Map("message" -> message))
         }
-      } else {
-        val event = eventOpt.get
+      case Some(event) =>
         val data = eventClient.futureInsert(event, appId, channelId).map { id =>
           val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
 
@@ -78,7 +68,6 @@ private[predictionio] object Webhooks {
           result
         }
         data
-      }
     }
   }
 
@@ -114,14 +103,13 @@ private[predictionio] object Webhooks {
       }
     }
 
-    eventFuture.flatMap { eventOpt =>
-      if (eventOpt.isEmpty) {
-        Future {
+    eventFuture.flatMap {
+      case None =>
+        Future successful {
           val message = s"webhooks connection for ${web} is not supported."
           (StatusCodes.NotFound, Map("message" -> message))
         }
-      } else {
-        val event = eventOpt.get
+      case Some(event) =>
         val data = eventClient.futureInsert(event, appId, channelId).map { id =>
           val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
 
@@ -131,7 +119,6 @@ private[predictionio] object Webhooks {
           result
         }
         data
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala
index 7c3aad0..82d62f8 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala
@@ -118,8 +118,8 @@ class EngineInstanceSerializer
           engineVariant = "",
           engineFactory = "",
           batch = "",
-          env = Map(),
-          sparkConf = Map(),
+          env = Map.empty,
+          sparkConf = Map.empty,
           dataSourceParams = "",
           preparatorParams = "",
           algorithmsParams = "",

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala
index a40adb3..5714fde 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala
@@ -47,8 +47,8 @@ case class EvaluationInstance(
   evaluationClass: String = "",
   engineParamsGeneratorClass: String = "",
   batch: String = "",
-  env: Map[String, String] = Map(),
-  sparkConf: Map[String, String] = Map(),
+  env: Map[String, String] = Map.empty,
+  sparkConf: Map[String, String] = Map.empty,
   evaluatorResults: String = "",
   evaluatorResultsHTML: String = "",
   evaluatorResultsJSON: String = "")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala
index ba20c61..57f0472 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala
@@ -56,7 +56,7 @@ object EventJson4sSupport {
         val targetEntityType = fields.getOpt[String]("targetEntityType")
         val targetEntityId = fields.getOpt[String]("targetEntityId")
         val properties = fields.getOrElse[Map[String, JValue]](
-          "properties", Map())
+          "properties", Map.empty)
         // default currentTime expressed as UTC timezone
         lazy val currentTime = DateTime.now(EventValidation.defaultTimeZone)
         val eventTime = fields.getOpt[String]("eventTime")
@@ -70,7 +70,6 @@ object EventJson4sSupport {
           }.getOrElse(currentTime)
 
         // disable tags from API for now.
-        val tags = List()
       // val tags = fields.getOpt[Seq[String]]("tags").getOrElse(List())
 
         val prId = fields.getOpt[String]("prId")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala b/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala
index 8e58384..5cf8ffc 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala
@@ -22,8 +22,6 @@ import org.joda.time.DateTime
 
 import org.json4s.JValue
 
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.RDD
 
 // each JValue data associated with the time it is set

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
index fd05767..52442a6 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
@@ -18,8 +18,6 @@
 
 package org.apache.predictionio.data.storage
 
-import java.lang.reflect.InvocationTargetException
-
 import grizzled.slf4j.Logging
 import org.apache.predictionio.annotation.DeveloperApi
 
@@ -78,7 +76,7 @@ trait BaseStorageClient {
 case class StorageClientConfig(
   parallel: Boolean = false, // parallelized access (RDD)?
   test: Boolean = false, // test mode config
-  properties: Map[String, String] = Map())
+  properties: Map[String, String] = Map.empty)
 
 /** :: DeveloperApi ::
   * Thrown when a StorageClient runs into an exceptional condition

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala b/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala
index d9c53ef..4f531b6 100644
--- a/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala
@@ -136,7 +136,6 @@ class EventSeq(val events: List[Event]) {
     events
     .groupBy( _.entityId )
     .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op))
-    .toMap
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala b/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala
index 30f9c17..b6b509c 100644
--- a/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala
@@ -18,10 +18,6 @@
 
 package org.apache.predictionio.data.view
 
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.LEvents
-import org.apache.predictionio.data.storage.EventValidation
-import org.apache.predictionio.data.storage.DataMap
 import org.apache.predictionio.data.storage.Storage
 
 import scala.concurrent.ExecutionContext.Implicits.global // TODO

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala b/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala
index 114c6b9..dae9074 100644
--- a/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala
+++ b/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala
@@ -18,7 +18,6 @@
 package org.apache.predictionio.e2.engine
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkContext._
 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.linalg.Vector
 import scala.collection.immutable.HashMap

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala b/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala
index 9dc6f9d..90b5267 100644
--- a/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala
+++ b/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala
@@ -17,7 +17,6 @@
 
 package org.apache.predictionio.e2.engine
 
-import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.RDD
 
 /** Class for training a naive Bayes model with categorical variables */

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala b/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala
index 3c3ac34..130c53a 100644
--- a/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala
+++ b/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala
@@ -17,7 +17,6 @@
 
 package org.apache.predictionio.e2.engine
 
-import org.apache.spark.SparkContext._
 import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
 import org.apache.spark.mllib.linalg.{SparseVector, Vectors}
 import org.apache.spark.rdd.RDD

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index 0b319ab..6afed12 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -141,7 +141,7 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
     try {
       val json =
         ("query" ->
-          ("match_all" -> List.empty))
+          ("match_all" -> Nil))
       ESUtils.getAll[App](restClient, index, estype, compact(render(json)))
     } catch {
       case e: IOException =>

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
index ec72a49..749ab49 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
@@ -80,7 +80,7 @@ object ESEventsUtil {
       targetEntityId = targetEntityId,
       properties = properties,
       eventTime = eventTime,
-      tags = Seq(),
+      tags = Nil,
       prId = prId,
       creationTime = creationTime
     )

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index 6240059..6c0c4a7 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -28,7 +28,6 @@ import org.apache.http.util.EntityUtils
 import org.apache.predictionio.data.storage.Event
 import org.apache.predictionio.data.storage.LEvents
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
 import org.joda.time.DateTime
 import org.json4s._
 import org.json4s.JsonDSL._
@@ -36,7 +35,6 @@ import org.json4s.native.JsonMethods._
 import org.json4s.native.Serialization.write
 import org.json4s.ext.JodaTimeSerializers
 import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
 import org.apache.http.message.BasicHeader
 
 class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String)
@@ -217,29 +215,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
     }
   }
 
-  private def exists(restClient: RestClient, estype: String, id: Int): Boolean = {
-    try {
-      restClient.performRequest(
-        "GET",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
-          case 200 => true
-          case _ => false
-        }
-    } catch {
-      case e: ResponseException =>
-        e.getResponse.getStatusLine.getStatusCode match {
-          case 404 => false
-          case _ =>
-            error(s"Failed to access to /$index/$estype/$id", e)
-            false
-        }
-      case e: IOException =>
-        error(s"Failed to access to $index/$estype/$id", e)
-        false
-    }
-  }
-
   override def futureGet(
     eventId: String,
     appId: Int,
@@ -288,7 +263,8 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
         val response = restClient.performRequest(
           "POST",
           s"/$index/$estype/_delete_by_query",
-          Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava)
+          Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
+          entity)
         val jsonResponse = parse(EntityUtils.toString(response.getEntity))
         val result = (jsonResponse \ "result").extract[String]
         result match {
@@ -331,7 +307,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
       } catch {
         case e: IOException =>
           error(e.getMessage)
-          Iterator[Event]()
+          Iterator.empty
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
index b9ad8bb..9f0a188 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
@@ -120,7 +120,8 @@ class ESPEvents(client: ESClient, config: StorageClientConfig, index: String)
             val response = restClient.performRequest(
               "POST",
               s"/$index/$estype/_delete_by_query",
-              Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava)
+              Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
+              entity)
             val jsonResponse = parse(EntityUtils.toString(response.getEntity))
             val result = (jsonResponse \ "result").extract[String]
             result match {

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index 3f7b058..cd9aa53 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -44,16 +44,13 @@ object ESUtils {
     implicit formats: Formats): Event = {
     def getString(s: String): String = {
       (value \ s) match {
-        case x if x == JNothing => null
+        case JNothing => null
         case x => x.extract[String]
       }
     }
 
     def getOptString(s: String): Option[String] = {
-      getString(s) match {
-        case null => None
-        case x => Some(x)
-      }
+      Option(getString(s))
     }
 
     val properties: DataMap = getOptString("properties")
@@ -233,14 +230,14 @@ object ESUtils {
     targetEntityId: Option[Option[String]] = None,
     reversed: Option[Boolean] = None): String = {
     val mustQueries = Seq(
-      startTime.map(x => {
+      startTime.map { x =>
         val v = formatUTCDateTime(x)
         s"""{"range":{"eventTime":{"gte":"${v}"}}}"""
-      }),
-      untilTime.map(x => {
+      },
+      untilTime.map { x =>
         val v = formatUTCDateTime(x)
         s"""{"range":{"eventTime":{"lt":"${v}"}}}"""
-      }),
+      },
       entityType.map(x => s"""{"term":{"entityType":"${x}"}}"""),
       entityId.map(x => s"""{"term":{"entityId":"${x}"}}"""),
       targetEntityType.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityType":"${x}"}}""")),

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 077168a..5e3abe2 100644
--- a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -95,7 +95,7 @@ class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
     } catch {
       case e: ElasticsearchException =>
         error(e.getMessage)
-        Seq[AccessKey]()
+        Nil
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index 3781a4b..270af0e 100644
--- a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -105,7 +105,7 @@ class ESApps(client: Client, config: StorageClientConfig, index: String)
     } catch {
       case e: ElasticsearchException =>
         error(e.getMessage)
-        Seq[App]()
+        Nil
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index 21690bf..2d6056b 100644
--- a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -107,7 +107,7 @@ class ESEngineInstances(client: Client, config: StorageClientConfig, index: Stri
     } catch {
       case e: ElasticsearchException =>
         error(e.getMessage)
-        Seq()
+        Nil
     }
   }
 
@@ -127,7 +127,7 @@ class ESEngineInstances(client: Client, config: StorageClientConfig, index: Stri
     } catch {
       case e: ElasticsearchException =>
         error(e.getMessage)
-        Seq()
+        Nil
     }
   }
 
@@ -150,7 +150,7 @@ class ESEngineInstances(client: Client, config: StorageClientConfig, index: Stri
 
   def delete(id: String): Unit = {
     try {
-      val response = client.prepareDelete(index, estype, id).get
+      client.prepareDelete(index, estype, id).get
     } catch {
       case e: ElasticsearchException => error(e.getMessage)
     }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 85bf820..68c5a74 100644
--- a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -101,7 +101,7 @@ class ESEvaluationInstances(client: Client, config: StorageClientConfig, index:
     } catch {
       case e: ElasticsearchException =>
         error(e.getMessage)
-        Seq()
+        Nil
     }
   }
 
@@ -114,7 +114,7 @@ class ESEvaluationInstances(client: Client, config: StorageClientConfig, index:
     } catch {
       case e: ElasticsearchException =>
         error(e.getMessage)
-        Seq()
+        Nil
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala
index 2cdb734..64487fb 100644
--- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala
@@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.client.Scan
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.filter.FilterList
-import org.apache.hadoop.hbase.filter.RegexStringComparator
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
 import org.apache.hadoop.hbase.filter.BinaryComparator
@@ -275,7 +274,7 @@ object HBEventsUtil {
       targetEntityId = targetEntityId,
       properties = properties,
       eventTime = eventTime,
-      tags = Seq(),
+      tags = Nil,
       prId = prId,
       creationTime = creationTime
     )
@@ -375,34 +374,28 @@ object HBEventsUtil {
       }
     }
 
-    targetEntityType.foreach { tetOpt =>
-      if (tetOpt.isEmpty) {
+    targetEntityType.foreach {
+      case None =>
         val filter = createSkipRowIfColumnExistFilter("targetEntityType")
         filters.addFilter(filter)
-      } else {
-        tetOpt.foreach { tet =>
-          val filter = createBinaryFilter(
-            "targetEntityType", Bytes.toBytes(tet))
-          // the entire row will be skipped if the column is not found.
-          filter.setFilterIfMissing(true)
-          filters.addFilter(filter)
-        }
-      }
+      case Some(tet) =>
+        val filter = createBinaryFilter(
+          "targetEntityType", Bytes.toBytes(tet))
+        // the entire row will be skipped if the column is not found.
+        filter.setFilterIfMissing(true)
+        filters.addFilter(filter)
     }
 
-    targetEntityId.foreach { teidOpt =>
-      if (teidOpt.isEmpty) {
+    targetEntityId.foreach {
+      case None =>
         val filter = createSkipRowIfColumnExistFilter("targetEntityId")
         filters.addFilter(filter)
-      } else {
-        teidOpt.foreach { teid =>
-          val filter = createBinaryFilter(
-            "targetEntityId", Bytes.toBytes(teid))
-          // the entire row will be skipped if the column is not found.
-          filter.setFilterIfMissing(true)
-          filters.addFilter(filter)
-        }
-      }
+      case Some(teid) =>
+        val filter = createBinaryFilter(
+          "targetEntityId", Bytes.toBytes(teid))
+        // the entire row will be skipped if the column is not found.
+        filter.setFilterIfMissing(true)
+        filters.addFilter(filter)
     }
 
     if (!filters.getFilters().isEmpty) {

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala
index cc07fa4..795cf7e 100644
--- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala
+++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala
@@ -185,7 +185,7 @@ object HB_0_8_0 {
       targetEntityId = targetEntityId,
       properties = properties,
       eventTime = new DateTime(rowKey.millis, eventTimeZone),
-      tags = Seq(),
+      tags = Nil,
       prId = prId,
       creationTime = creationTime
     )

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala
index 35572c9..c76d203 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala
@@ -59,10 +59,10 @@ object RunBatchPredict extends Logging {
       "--engine-variant",
       batchPredictArgs.variantJson.getOrElse(
         new File(engineDirPath, "engine.json")).getCanonicalPath) ++
-      (if (batchPredictArgs.queryPartitions.isEmpty) Seq()
+      (if (batchPredictArgs.queryPartitions.isEmpty) Nil
         else Seq("--query-partitions",
                   batchPredictArgs.queryPartitions.get.toString)) ++
-      (if (verbose) Seq("--verbose") else Seq()) ++
+      (if (verbose) Seq("--verbose") else Nil) ++
       Seq("--json-extractor", batchPredictArgs.jsonExtractor.toString)
 
     Runner.runOnSpark(

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
index 6df0750..a25f4e0 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
@@ -64,25 +64,25 @@ object RunWorkflow extends Logging {
       "--verbosity",
       wa.verbosity.toString) ++
       wa.engineFactory.map(
-        x => Seq("--engine-factory", x)).getOrElse(Seq()) ++
+        x => Seq("--engine-factory", x)).getOrElse(Nil) ++
       wa.engineParamsKey.map(
-        x => Seq("--engine-params-key", x)).getOrElse(Seq()) ++
-      (if (wa.batch != "") Seq("--batch", wa.batch) else Seq()) ++
-      (if (verbose) Seq("--verbose") else Seq()) ++
-      (if (wa.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++
-      (if (wa.stopAfterRead) Seq("--stop-after-read") else Seq()) ++
+        x => Seq("--engine-params-key", x)).getOrElse(Nil) ++
+      (if (wa.batch != "") Seq("--batch", wa.batch) else Nil) ++
+      (if (verbose) Seq("--verbose") else Nil) ++
+      (if (wa.skipSanityCheck) Seq("--skip-sanity-check") else Nil) ++
+      (if (wa.stopAfterRead) Seq("--stop-after-read") else Nil) ++
       (if (wa.stopAfterPrepare) {
         Seq("--stop-after-prepare")
       } else {
-        Seq()
+        Nil
       }) ++
       wa.evaluation.map(x => Seq("--evaluation-class", x)).
-        getOrElse(Seq()) ++
+        getOrElse(Nil) ++
       // If engineParamsGenerator is specified, it overrides the evaluation.
       wa.engineParamsGenerator.orElse(wa.evaluation)
         .map(x => Seq("--engine-params-generator-class", x))
-        .getOrElse(Seq()) ++
-      (if (wa.batch != "") Seq("--batch", wa.batch) else Seq()) ++
+        .getOrElse(Nil) ++
+      (if (wa.batch != "") Seq("--batch", wa.batch) else Nil) ++
       Seq("--json-extractor", wa.jsonExtractor.toString)
 
     Runner.runOnSpark(

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
index 4a721be..4e266c8 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
@@ -30,7 +30,7 @@ import scala.sys.process._
 
 case class SparkArgs(
   sparkHome: Option[String] = None,
-  sparkPassThrough: Seq[String] = Seq(),
+  sparkPassThrough: Seq[String] = Nil,
   sparkKryo: Boolean = false,
   scratchUri: Option[URI] = None)
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala b/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala
index 94a1a03..710c23a 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala
@@ -84,11 +84,11 @@ class CommandClient(
             val accessKey = AccessKey(
               key = "",
               appid = id,
-              events = Seq())
+              events = Nil)
             val accessKey2 = accessKeyClient.insert(AccessKey(
               key = "",
               appid = id,
-              events = Seq()))
+              events = Nil))
             accessKey2 map { k =>
               new AppNewResponse(1,"App created successfully.",id, req.name, k)
             } getOrElse {

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala
index 44fa667..5884ebd 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala
@@ -67,7 +67,7 @@ object App extends EitherLogging {
           val newKey = storage.AccessKey(
             key = accessKey,
             appid = id,
-            events = Seq())
+            events = Nil)
           accessKeys.insert(newKey)
           .map { k =>
             Right(AppDescription(
@@ -85,7 +85,7 @@ object App extends EitherLogging {
               errStr += s"""
                 |Failed to revert back the App meta-data change.
                 |The app ${name} CANNOT be used!
-                |Please run 'pio app delete ${name}' to delete this app!"""
+                |Please run 'pio app delete ${name}' to delete this app!""".stripMargin
           }
           logAndFail(errStr)
         }
@@ -209,12 +209,12 @@ object App extends EitherLogging {
               channels.find(ch => ch.name == chName) match {
                 case None =>
                   return logAndFail(s"""Unable to delete data for channel.
-                              |Channel ${chName} doesn't exist.""")
+                              |Channel ${chName} doesn't exist.""".stripMargin)
                 case Some(ch) => Seq(Some(ch.id))
               }
-              } getOrElse {
-                Seq(None) // for default channel
-              }
+            } getOrElse {
+              Seq(None) // for default channel
+            }
           }
 
         chanIdsToRemove.map { chId: Option[Int] =>
@@ -246,8 +246,7 @@ object App extends EitherLogging {
             errStr =
               if (chId.isDefined) {
                 s"Unable to initialize Event Store for the channel ID: ${chId.get}."
-              }
-              else {
+              } else {
                 s"Unable to initialize Event tore for the app ID: ${appDesc.app.id}."
               }
             error(errStr)
@@ -272,11 +271,11 @@ object App extends EitherLogging {
       show(appName).right flatMap { case (appDesc: AppDescription, channels: Seq[Channel]) =>
         if (channels.find(ch => ch.name == newChannel).isDefined) {
           logAndFail(s"""Channel ${newChannel} already exists.
-                      |Unable to create new channel.""")
+                      |Unable to create new channel.""".stripMargin)
         } else if (!storage.Channel.isValidName(newChannel)) {
           logAndFail(s"""Unable to create new channel.
                       |The channel name ${newChannel} is invalid.
-                      |${storage.Channel.nameConstraint}""")
+                      |${storage.Channel.nameConstraint}""".stripMargin)
         } else {
 
           val channel = Channel(
@@ -299,7 +298,7 @@ object App extends EitherLogging {
               Right(channel.copy(id = chanId))
             } else {
               errStr = s"""Unable to create new channel.
-                          |Failed to initalize Event Store."""
+                          |Failed to initalize Event Store.""".stripMargin
               error(errStr)
               // reverted back the meta data
               try {
@@ -307,17 +306,17 @@ object App extends EitherLogging {
                 Left(errStr)
               } catch {
                 case e: Exception =>
-                  val nextErrStr = s"""
+                  val nextErrStr = (s"""
                     |Failed to revert back the Channel meta-data change.
                     |The channel ${newChannel} CANNOT be used!
                     |Please run 'pio app channel-delete ${appName} ${newChannel}'""" +
-                    " to delete this channel!"
+                    " to delete this channel!").stripMargin
                   logAndFail(errStr + nextErrStr)
               }
             }
           } getOrElse {
             logAndFail(s"""Unable to create new channel.
-                          |Failed to update Channel meta-data.""")
+                          |Failed to update Channel meta-data.""".stripMargin)
           }
         }
       }
@@ -329,33 +328,32 @@ object App extends EitherLogging {
   def channelDelete(appName: String, deleteChannel: String): MaybeError = {
     val chanStorage = storage.Storage.getMetaDataChannels
     val events = storage.Storage.getLEvents()
-    var errStr = ""
     try {
       show(appName).right.flatMap { case (appDesc: AppDescription, channels: Seq[Channel]) =>
         val foundChannel = channels.find(ch => ch.name == deleteChannel)
-        if (foundChannel.isEmpty) {
-          logAndFail(s"""Unable to delete channel
-                        |Channel ${deleteChannel} doesn't exists.""")
-        } else {
-          val chId = foundChannel.get.id
-          val dbRemoved = events.remove(appDesc.app.id, Some(chId))
-          if (dbRemoved) {
-            info(s"Removed Event Store for this channel: ${deleteChannel}")
-            try {
-              chanStorage.delete(chId)
-              logAndSucceed(s"Deleted channel: ${deleteChannel}.")
-            } catch {
-              case e: Exception =>
-                logAndFail(s"""Unable to delete channel.
-                  |Failed to update Channel meta-data.
-                  |The channel ${deleteChannel} CANNOT be used!
-                  |Please run 'pio app channel-delete ${appDesc.app.name} ${deleteChannel}'""" +
-                  " to delete this channel again!")
+        foundChannel match {
+          case None =>
+            logAndFail(s"""Unable to delete channel
+                          |Channel ${deleteChannel} doesn't exists.""".stripMargin)
+          case Some(channel) =>
+            val dbRemoved = events.remove(appDesc.app.id, Some(channel.id))
+            if (dbRemoved) {
+              info(s"Removed Event Store for this channel: ${deleteChannel}")
+              try {
+                chanStorage.delete(channel.id)
+                logAndSucceed(s"Deleted channel: ${deleteChannel}.")
+              } catch {
+                case e: Exception =>
+                  logAndFail((s"""Unable to delete channel.
+                   |Failed to update Channel meta-data.
+                   |The channel ${deleteChannel} CANNOT be used!
+                   |Please run 'pio app channel-delete ${appDesc.app.name} ${deleteChannel}'""" +
+                    " to delete this channel again!").stripMargin)
+              }
+            } else {
+              logAndFail(s"""Unable to delete channel.
+                            |Error removing Event Store for this channel.""".stripMargin)
             }
-          } else {
-            logAndFail(s"""Unable to delete channel.
-                          |Error removing Event Store for this channel.""")
-          }
         }
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
index 30c249b..ee8cd50 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
@@ -49,7 +49,7 @@ case class PioStatus(
   sparkHome: String = "",
   sparkVersion: String = "",
   sparkMinVersion: String = "",
-  warnings: Seq[String] = Seq())
+  warnings: Seq[String] = Nil)
 
 object Management extends EitherLogging {
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
index 4a72635..acd7598 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
@@ -49,12 +49,12 @@ case class ConsoleArgs(
   dashboard: DashboardArgs = DashboardArgs(),
   export: ExportArgs = ExportArgs(),
   imprt: ImportArgs = ImportArgs(),
-  commands: Seq[String] = Seq(),
+  commands: Seq[String] = Nil,
   metricsParamsJsonPath: Option[String] = None,
   paramsPath: String = "params",
   engineInstanceId: Option[String] = None,
   mainClass: Option[String] = None,
-  driverPassThrough: Seq[String] = Seq(),
+  driverPassThrough: Seq[String] = Nil,
   pioHome: Option[String] = None,
   verbose: Boolean = false)
 
@@ -69,7 +69,7 @@ case class AppArgs(
 
 case class AccessKeyArgs(
   accessKey: String = "",
-  events: Seq[String] = Seq())
+  events: Seq[String] = Nil)
 
 case class EngineInfo(
   engineId: String,
@@ -763,7 +763,7 @@ object Console extends Logging {
     }
   }
 
-  def help(commands: Seq[String] = Seq()): String = {
+  def help(commands: Seq[String] = Nil): String = {
     if (commands.isEmpty) {
       mainHelp
     } else {