You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:30 UTC

[20/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
new file mode 100644
index 0000000..c0a4ee2
--- /dev/null
+++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.client
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream}
+import java.util.Random
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig}
+import org.apache.gearpump.experiments.yarn.glue.Records._
+import org.apache.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
+import org.apache.gearpump.util.FileUtils
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.util.Try
+class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+  implicit var system: ActorSystem = null
+
+  val rand = new Random()
+
+  private def randomArray(size: Int): Array[Byte] = {
+    val array = new Array[Byte](size)
+    rand.nextBytes(array)
+    array
+  }
+  val appId = ApplicationId.newInstance(0L, 0)
+
+  val akka = ConfigFactory.parseString(
+
+    """
+      |gearpump {
+      |  yarn {
+      |    client {
+      |      package -path = "/user/gearpump/gearpump.zip"
+      |    }
+      |
+      |    applicationmaster {
+      |      ## Memory of YarnAppMaster
+      |        command = "$JAVA_HOME/bin/java -Xmx512m"
+      |      memory = "512"
+      |      vcores = "1"
+      |      queue = "default"
+      |    }
+      |
+      |    master {
+      |      ## Memory of master daemon
+      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
+      |      containers = "2"
+      |      memory = "512"
+      |      vcores = "1"
+      |    }
+      |
+      |    worker {
+      |      ## memory of worker daemon
+      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
+      |      containers = "4"
+      |      ## This also contains all memory for child executors.
+      |      memory = "4096"
+      |      vcores = "1"
+      |    }
+      |    services {
+      |      enabled = true
+      |    }
+      |  }
+      |}
+    """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem(getClass.getSimpleName, akka)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  it should "reject non-zip files" in {
+    val yarnConfig = mock(classOf[YarnConfig])
+    val yarnClient = mock(classOf[YarnClient])
+    val fs = mock(classOf[FileSystem])
+    val appMasterResolver = mock(classOf[AppMasterResolver])
+
+    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
+    val packagePath = "gearpump.zip2"
+    assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
+  }
+
+  it should "reject if we cannot find the package file on HDFS" in {
+    val yarnConfig = mock(classOf[YarnConfig])
+    val yarnClient = mock(classOf[YarnClient])
+    val fs = mock(classOf[FileSystem])
+    val appMasterResolver = mock(classOf[AppMasterResolver])
+
+    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
+    val packagePath = "gearpump.zip"
+    when(fs.exists(anyString)).thenReturn(false)
+    assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
+  }
+
+  it should "throw when package exists on HDFS, but the file is corrupted" in {
+    val yarnConfig = mock(classOf[YarnConfig])
+    val yarnClient = mock(classOf[YarnClient])
+    val fs = mock(classOf[FileSystem])
+    val appMasterResolver = mock(classOf[AppMasterResolver])
+
+    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
+    val packagePath = "gearpump.zip"
+    when(fs.exists(anyString)).thenReturn(true)
+
+    val content = new ByteArrayInputStream(randomArray(10))
+    when(fs.open(anyString)).thenReturn(content)
+    assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
+    content.close()
+  }
+
+  it should "throw when the HDFS package version is not consistent with local version" in {
+    val yarnConfig = mock(classOf[YarnConfig])
+    val yarnClient = mock(classOf[YarnClient])
+    val fs = mock(classOf[FileSystem])
+    val appMasterResolver = mock(classOf[AppMasterResolver])
+
+    val version = "gearpump-0.2"
+    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
+      appMasterResolver, version)
+    val packagePath = "gearpump.zip"
+    when(fs.exists(anyString)).thenReturn(true)
+
+    val oldVesion = "gearpump-0.1"
+    when(fs.open(anyString)).thenReturn(zipInputStream(oldVesion))
+    assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
+  }
+
+  it should "upload config file to HDFS when submitting" in {
+    val yarnConfig = mock(classOf[YarnConfig])
+    val yarnClient = mock(classOf[YarnClient])
+    val fs = mock(classOf[FileSystem])
+    val appMasterResolver = mock(classOf[AppMasterResolver])
+
+    val version = "gearpump-0.2"
+    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient,
+      fs, system, appMasterResolver, version)
+    val packagePath = "gearpump.zip"
+
+    val out = mock(classOf[OutputStream])
+    when(fs.exists(anyString)).thenReturn(true)
+    when(fs.create(anyString)).thenReturn(out)
+    when(fs.getHomeDirectory).thenReturn("/root")
+
+    when(fs.open(anyString)).thenReturn(zipInputStream(version))
+
+    val report = mock(classOf[ApplicationReport])
+    when(yarnClient.awaitApplication(any[ApplicationId], anyLong())).thenReturn(report)
+
+    when(report.getApplicationId).thenReturn(appId)
+    when(yarnClient.createApplication).thenReturn(appId)
+    assert(appId == launcher.submit("gearpump", packagePath))
+
+    // 3 Config files are uploaded to HDFS, one is akka.conf,
+    // one is yarn-site.xml, one is log4j.properties.
+    verify(fs, times(3)).create(anyString)
+    verify(out, times(3)).close()
+
+    // val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
+    // scalastyle:off line.size.limit
+    val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 -Dgearpump.binary-version-with-scala-version=gearpump-0.2 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster  -conf /root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
+    // scalastyle:on line.size.limit
+    verify(yarnClient).submit("gearpump", appId, expectedCommand,
+      Resource.newInstance(512, 1), "default",
+      "gearpump.zip", "/root/.gearpump_application_0_0000/conf/")
+  }
+
+  it should "save active config from Gearpump cluster" in {
+    val yarnConfig = mock(classOf[YarnConfig])
+    val yarnClient = mock(classOf[YarnClient])
+    val fs = mock(classOf[FileSystem])
+    val appMasterResolver = mock(classOf[AppMasterResolver])
+    val appMaster = TestProbe()
+
+    val version = "gearpump-0.2"
+    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
+      appMasterResolver, version)
+    val outputPath = java.io.File.createTempFile("LaunchClusterSpec", ".conf")
+
+    when(appMasterResolver.resolve(any[ApplicationId], anyInt)).thenReturn(appMaster.ref)
+    val fileFuture = launcher.saveConfig(appId, outputPath.getPath)
+    appMaster.expectMsgType[GetActiveConfig]
+    appMaster.reply(ActiveConfig(ConfigFactory.empty()))
+
+    import scala.concurrent.duration._
+    val file = Await.result(fileFuture, 30.seconds).asInstanceOf[java.io.File]
+
+    assert(!FileUtils.read(file).isEmpty)
+    file.delete()
+  }
+
+  private def zipInputStream(version: String): InputStream = {
+    val bytes = new ByteArrayOutputStream(1000)
+    val zipOut = new ZipOutputStream(bytes)
+
+    // Not available on BufferedOutputStream
+    zipOut.putNextEntry(new ZipEntry(s"$version/README.md"))
+    zipOut.write("README".getBytes())
+    // Not available on BufferedOutputStream
+    zipOut.closeEntry()
+    zipOut.close()
+    new ByteArrayInputStream(bytes.toByteArray)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/ManageClusterSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/ManageClusterSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/ManageClusterSpec.scala
new file mode 100644
index 0000000..01960ad
--- /dev/null
+++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/ManageClusterSpec.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.client
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
+import org.apache.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.main.ParseResult
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, ClusterInfo, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, Version}
+import org.apache.gearpump.experiments.yarn.client.ManageCluster._
+import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId
+import org.apache.gearpump.util.FileUtils
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+
+  implicit var system: ActorSystem = null
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  it should "getConfig from remote Gearpump" in {
+    val appId = ApplicationId.newInstance(0L, 0)
+    val appMaster = TestProbe()
+    val manager = new ManageCluster(appId, appMaster.ref, system)
+
+    val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
+
+    val future = manager.command(GET_CONFIG, new ParseResult(Map("output" -> output.toString),
+      Array.empty[String]))
+    appMaster.expectMsgType[GetActiveConfig]
+    appMaster.reply(ActiveConfig(ConfigFactory.empty()))
+    import scala.concurrent.duration._
+    Await.result(future, 30.seconds)
+
+    val content = FileUtils.read(output)
+    assert(content.length > 0)
+    output.delete()
+  }
+
+  it should "addworker" in {
+    val appId = ApplicationId.newInstance(0L, 0)
+    val appMaster = TestProbe()
+    val manager = new ManageCluster(appId, appMaster.ref, system)
+
+    val future = manager.command(ADD_WORKER, new ParseResult(Map("count" -> 1.toString),
+      Array.empty[String]))
+    appMaster.expectMsg(AddWorker(1))
+    appMaster.reply(CommandResult(true))
+    import scala.concurrent.duration._
+    val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
+    assert(result.success)
+  }
+
+  it should "removeworker" in {
+    val appId = ApplicationId.newInstance(0L, 0)
+    val appMaster = TestProbe()
+    val manager = new ManageCluster(appId, appMaster.ref, system)
+
+    val future = manager.command(REMOVE_WORKER, new ParseResult(Map("container" -> "1"),
+      Array.empty[String]))
+    appMaster.expectMsg(RemoveWorker("1"))
+    appMaster.reply(CommandResult(true))
+    import scala.concurrent.duration._
+    val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
+    assert(result.success)
+  }
+
+  it should "get version" in {
+    val appId = ApplicationId.newInstance(0L, 0)
+    val appMaster = TestProbe()
+    val manager = new ManageCluster(appId, appMaster.ref, system)
+    val future = manager.command(VERSION, new ParseResult(Map("container" -> "1"),
+      Array.empty[String]))
+    appMaster.expectMsg(QueryVersion)
+    appMaster.reply(Version("version 0.1"))
+    import scala.concurrent.duration._
+    val result = Await.result(future, 30.seconds).asInstanceOf[Version]
+    assert(result.version == "version 0.1")
+  }
+
+  it should "get cluster info" in {
+    val appId = ApplicationId.newInstance(0L, 0)
+    val appMaster = TestProbe()
+    val manager = new ManageCluster(appId, appMaster.ref, system)
+
+    val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
+
+    val future = manager.command(QUERY, new ParseResult(Map.empty[String, String],
+      Array.empty[String]))
+    appMaster.expectMsg(QueryClusterInfo)
+    appMaster.reply(ClusterInfo(List("master"), List("worker")))
+    import scala.concurrent.duration._
+    val result = Await.result(future, 30.seconds).asInstanceOf[ClusterInfo]
+    assert(result.masters.sameElements(List("master")))
+    assert(result.workers.sameElements(List("worker")))
+  }
+
+  it should "kill the cluster" in {
+    val appId = ApplicationId.newInstance(0L, 0)
+    val appMaster = TestProbe()
+    val manager = new ManageCluster(appId, appMaster.ref, system)
+
+    val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
+
+    val future = manager.command(KILL, new ParseResult(Map("container" -> "1"),
+      Array.empty[String]))
+    appMaster.expectMsg(Kill)
+    appMaster.reply(CommandResult(true))
+    import scala.concurrent.duration._
+    val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
+    assert(result.success)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
deleted file mode 100644
index 194c9a5..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.slf4j.Logger
-
-import io.gearpump.TimeStamp
-import io.gearpump.streaming.hadoop.lib.rotation.Rotation
-import io.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, HadoopCheckpointStoreWriter}
-import io.gearpump.streaming.transaction.api.CheckpointStore
-import io.gearpump.util.LogUtil
-
-object HadoopCheckpointStore {
-  val LOG: Logger = LogUtil.getLogger(classOf[HadoopCheckpointStore])
-}
-
-/**
- * Stores timestamp-checkpoint mapping to Hadoop-compatible filesystem.
- *
- * Store file layout:
- * {{{
- * timestamp1, index1,
- * timestamp2, index2,
- * ...
- * timestampN, indexN
- * }}}
- */
-class HadoopCheckpointStore(
-    dir: Path,
-    fs: FileSystem,
-    hadoopConfig: Configuration,
-    rotation: Rotation)
-  extends CheckpointStore {
-
-  private[hadoop] var curTime = 0L
-  private[hadoop] var curStartTime = curTime
-  private[hadoop] var curFile: Option[String] = None
-  private[hadoop] var curWriter: Option[HadoopCheckpointStoreWriter] = None
-  // regex (checkpoints-$startTime-$endTime.store) for complete checkpoint file,
-  private val compRegex =
-    """checkpoints-(\d+)-(\d+).store""".r
-  // regex (checkpoints-$startTime.store) for temporary checkpoint file
-  private val tempRegex =
-    """checkpoints-(\d+).store""".r
-
-  /**
-   * Persists a pair of timestamp and checkpoint, which:
-   *
-   *   1. creates a temporary checkpoint file, checkpoints-\$startTime.store, if not exist
-   *   2. writes out (timestamp, checkpoint) and marks rotation
-   *   3. rotates checkpoint file if needed
-   *     a. renames temporary checkpoint file to checkpoints-\$startTime-\$endTime.store
-   *     b. closes current writer and reset
-   *     c. rotation rotates
-   */
-  override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = {
-    curTime = timestamp
-    if (curWriter.isEmpty) {
-      curStartTime = curTime
-      curFile = Some(s"checkpoints-$curStartTime.store")
-      curWriter = curFile.map(file =>
-        new HadoopCheckpointStoreWriter(new Path(dir, file), hadoopConfig))
-    }
-
-    curWriter.foreach { w =>
-      val offset = w.write(timestamp, checkpoint)
-      rotation.mark(timestamp, offset)
-    }
-
-    if (rotation.shouldRotate) {
-      curFile.foreach { f =>
-        fs.rename(new Path(dir, f), new Path(dir, s"checkpoints-$curStartTime-$curTime.store"))
-        curWriter.foreach(_.close())
-        curWriter = None
-      }
-      rotation.rotate
-    }
-  }
-
-  /**
-   * Recovers checkpoint given timestamp, which
-   *  {{{
-   *   1. returns None if no store exists
-   *   2. searches checkpoint stores for
-   *     a. complete store checkpoints-\$startTime-\$endTime.store
-   *         where startTime <= timestamp <= endTime
-   *     b. temporary store checkpoints-\$startTime.store
-   *         where startTime <= timestamp
-   *   3. renames store to checkpoints-\$startTime-\$endTime.store
-   *   4. deletes all stores whose name has a startTime larger than timestamp
-   *   5. looks for the checkpoint in the found store
-   *   }}}
-   */
-  override def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
-    var checkpoint: Option[Array[Byte]] = None
-
-    if (fs.exists(dir)) {
-      var checkpointFile: Option[Path] = None
-      fs.listStatus(dir).map(_.getPath).foreach { file =>
-        val fileName = file.getName
-        fileName match {
-          case compRegex(start, end) =>
-            val startTime = start.toLong
-            val endTime = end.toLong
-            if (timestamp >= startTime && timestamp <= endTime) {
-              checkpointFile = Some(new Path(dir, fileName))
-            } else if (timestamp < startTime) {
-              fs.delete(file, true)
-            }
-          case tempRegex(start) =>
-            val startTime = start.toLong
-            if (timestamp >= startTime) {
-              val newFile = new Path(dir, s"checkpoints-$startTime-$timestamp.store")
-              fs.rename(new Path(dir, fileName), newFile)
-              checkpointFile = Some(newFile)
-            }
-        }
-      }
-
-      checkpointFile.foreach { file =>
-        val reader = new HadoopCheckpointStoreReader(file, hadoopConfig)
-
-        @annotation.tailrec
-        def read: Option[Array[Byte]] = {
-          if (reader.hasNext) {
-            val (time, bytes) = reader.next()
-            if (time == timestamp) {
-              Some(bytes)
-            } else {
-              read
-            }
-          } else {
-            None
-          }
-        }
-        checkpoint = read
-        reader.close()
-      }
-    }
-    checkpoint
-  }
-
-  override def close(): Unit = {
-    curWriter.foreach(_.close())
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
deleted file mode 100644
index 5a81ecd..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop
-
-import java.io.{ObjectInputStream, ObjectOutputStream}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.hadoop.lib.HadoopUtil
-import io.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation}
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory}
-
-object HadoopCheckpointStoreFactory {
-  val VERSION = 1
-}
-
-class HadoopCheckpointStoreFactory(
-    dir: String,
-    @transient private var hadoopConfig: Configuration,
-    rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong))
-  extends CheckpointStoreFactory {
-  import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory._
-
-  private def writeObject(out: ObjectOutputStream): Unit = {
-    out.defaultWriteObject()
-    hadoopConfig.write(out)
-  }
-
-  private def readObject(in: ObjectInputStream): Unit = {
-    in.defaultReadObject()
-    hadoopConfig = new Configuration(false)
-    hadoopConfig.readFields(in)
-  }
-
-  override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore = {
-    import taskContext.{appId, taskId}
-    val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION",
-      s"app$appId-task${taskId.processorId}_${taskId.index}")
-    val fs = HadoopUtil.getFileSystemForPath(dirPath, hadoopConfig)
-    new HadoopCheckpointStore(dirPath, fs, hadoopConfig, rotation)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala
deleted file mode 100644
index a07dbbc..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.hadoop
-
-import java.text.SimpleDateFormat
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hdfs.HdfsConfiguration
-import org.apache.hadoop.io.SequenceFile
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.hadoop.lib.HadoopUtil
-import io.gearpump.streaming.hadoop.lib.format.{DefaultSequenceFormatter, OutputFormatter}
-import io.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation}
-import io.gearpump.streaming.sink.DataSink
-import io.gearpump.streaming.task.{TaskContext, TaskId}
-
-class SequenceFileSink(
-    userConfig: UserConfig,
-    basePath: String,
-    rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong),
-    sequenceFormat: OutputFormatter = new DefaultSequenceFormatter)
-  extends DataSink{
-  @transient private lazy val configuration = new HdfsConfiguration()
-  private val dateFormat = new SimpleDateFormat("yyyy_MM_dd-HH-mm-ss")
-  private var writer: SequenceFile.Writer = null
-  private var taskId: TaskId = null
-  private var appName: String = null
-
-  /**
-   * Starts connection to data sink
-   *
-   * Invoked at onStart() method of [[io.gearpump.streaming.task.Task]]
-   *
-   * @param context is the task context at runtime
-   */
-  override def open(context: TaskContext): Unit = {
-    HadoopUtil.login(userConfig, configuration)
-    this.appName = context.appName
-    this.taskId = context.taskId
-    this.writer = getNextWriter
-  }
-
-  /**
-   * Writes message into data sink
-   *
-   * Invoked at onNext() method of [[io.gearpump.streaming.task.Task]]
-   * @param message wraps data to be written out
-   */
-  override def write(message: Message): Unit = {
-    val key = sequenceFormat.getKey(message)
-    val value = sequenceFormat.getValue(message)
-    if (writer == null) {
-      writer = getNextWriter
-    }
-    writer.append(key, value)
-    rotation.mark(message.timestamp, writer.getLength)
-    if (rotation.shouldRotate) {
-      closeWriter
-      this.writer = getNextWriter
-      rotation.rotate
-    }
-  }
-
-  /**
-   * Closes connection to data sink
-   *
-   * Invoked at onClose() method of [[io.gearpump.streaming.task.Task]]
-   */
-  override def close(): Unit = {
-    closeWriter()
-  }
-
-  private def closeWriter(): Unit = {
-    Option(writer).foreach { w =>
-      w.hflush()
-      w.close()
-    }
-  }
-
-  private def getNextWriter: SequenceFile.Writer = {
-    SequenceFile.createWriter(
-      configuration,
-      SequenceFile.Writer.file(getNextFilePath),
-      SequenceFile.Writer.keyClass(sequenceFormat.getKeyClass),
-      SequenceFile.Writer.valueClass(sequenceFormat.getValueClass)
-    )
-  }
-
-  private def getNextFilePath: Path = {
-    val base = new Path(basePath, s"$appName-task${taskId.processorId}_${taskId.index}")
-    new Path(base, dateFormat.format(new java.util.Date))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
deleted file mode 100644
index 52acbac..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop.lib
-
-import java.io.EOFException
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-
-import io.gearpump.TimeStamp
-
-class HadoopCheckpointStoreReader(
-    path: Path,
-    hadoopConfig: Configuration)
-  extends Iterator[(TimeStamp, Array[Byte])] {
-
-  private val stream = HadoopUtil.getInputStream(path, hadoopConfig)
-  private var nextTimeStamp: Option[TimeStamp] = None
-  private var nextData: Option[Array[Byte]] = None
-
-  override def hasNext: Boolean = {
-    if (nextTimeStamp.isDefined) {
-      true
-    } else {
-      try {
-        nextTimeStamp = Some(stream.readLong())
-        val length = stream.readInt()
-        val buffer = new Array[Byte](length)
-        stream.readFully(buffer)
-        nextData = Some(buffer)
-        true
-      } catch {
-        case e: EOFException =>
-          close()
-          false
-        case e: Exception =>
-          close()
-          throw e
-      }
-    }
-  }
-
-  override def next(): (TimeStamp, Array[Byte]) = {
-    val timeAndData = for {
-      time <- nextTimeStamp
-      data <- nextData
-    } yield (time, data)
-    nextTimeStamp = None
-    nextData = None
-    timeAndData.get
-  }
-
-  def close(): Unit = {
-    stream.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
deleted file mode 100644
index 35f2f51..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop.lib
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-
-import io.gearpump.TimeStamp
-
-class HadoopCheckpointStoreWriter(path: Path, hadoopConfig: Configuration) {
-  private lazy val stream = HadoopUtil.getOutputStream(path, hadoopConfig)
-
-  def write(timestamp: TimeStamp, data: Array[Byte]): Long = {
-    stream.writeLong(timestamp)
-    stream.writeInt(data.length)
-    stream.write(data)
-    stream.hflush()
-    stream.getPos()
-  }
-
-  def close(): Unit = {
-    stream.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala
deleted file mode 100644
index eb579e4..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.hadoop.lib
-
-import java.io.File
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
-import org.apache.hadoop.security.UserGroupInformation
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.util.{Constants, FileUtils}
-
-private[hadoop] object HadoopUtil {
-
-  def getOutputStream(path: Path, hadoopConfig: Configuration): FSDataOutputStream = {
-    val dfs = getFileSystemForPath(path, hadoopConfig)
-    val stream: FSDataOutputStream = {
-      if (dfs.isFile(path)) {
-        dfs.append(path)
-      } else {
-        dfs.create(path)
-      }
-    }
-    stream
-  }
-
-  def getInputStream(path: Path, hadoopConfig: Configuration): FSDataInputStream = {
-    val dfs = getFileSystemForPath(path, hadoopConfig)
-    val stream = dfs.open(path)
-    stream
-  }
-
-  def getFileSystemForPath(path: Path, hadoopConfig: Configuration): FileSystem = {
-    // For local file systems, return the raw local file system, such calls to flush()
-    // actually flushes the stream.
-    val fs = path.getFileSystem(hadoopConfig)
-    fs match {
-      case localFs: LocalFileSystem => localFs.getRawFileSystem
-      case _ => fs
-    }
-  }
-
-  def login(userConfig: UserConfig, configuration: Configuration): Unit = {
-    if (UserGroupInformation.isSecurityEnabled) {
-      val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL)
-      val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE)
-      if (principal.isEmpty || keytabContent.isEmpty) {
-        val errorMsg = s"HDFS is security enabled, user should provide kerberos principal in " +
-          s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} " +
-          s"and keytab file in ${Constants.GEARPUMP_KEYTAB_FILE}"
-        throw new Exception(errorMsg)
-      }
-      val keytabFile = File.createTempFile("login", ".keytab")
-      FileUtils.writeByteArrayToFile(keytabFile, keytabContent.get)
-      keytabFile.setExecutable(false)
-      keytabFile.setWritable(false)
-      keytabFile.setReadable(true, true)
-
-      UserGroupInformation.setConfiguration(configuration)
-      UserGroupInformation.loginUserFromKeytab(principal.get, keytabFile.getAbsolutePath)
-      keytabFile.delete()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
deleted file mode 100644
index d19e71f..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.hadoop.lib.format
-
-import org.apache.hadoop.io.{LongWritable, Text, Writable}
-
-import io.gearpump.Message
-
-class DefaultSequenceFormatter extends OutputFormatter {
-  override def getKey(message: Message): Writable = new LongWritable(message.timestamp)
-
-  override def getValue(message: Message): Writable = new Text(message.msg.asInstanceOf[String])
-
-  override def getKeyClass: Class[_ <: Writable] = classOf[LongWritable]
-
-  override def getValueClass: Class[_ <: Writable] = classOf[Text]
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
deleted file mode 100644
index fe8e52e..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.hadoop.lib.format
-
-import org.apache.hadoop.io.Writable
-
-import io.gearpump.Message
-
-trait OutputFormatter extends Serializable {
-  def getKeyClass: Class[_ <: Writable]
-
-  def getValueClass: Class[_ <: Writable]
-
-  def getKey(message: Message): Writable
-
-  def getValue(message: Message): Writable
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
deleted file mode 100644
index cd83ea5..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop.lib.rotation
-
-import io.gearpump.TimeStamp
-
-case class FileSizeRotation(maxBytes: Long) extends Rotation {
-
-  private var bytesWritten = 0L
-
-  override def mark(timestamp: TimeStamp, offset: Long): Unit = {
-    bytesWritten = offset
-  }
-
-  override def shouldRotate: Boolean = bytesWritten >= maxBytes
-
-  override def rotate(): Unit = {
-    bytesWritten = 0L
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
deleted file mode 100644
index e28b222..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop.lib.rotation
-
-import io.gearpump.TimeStamp
-
-trait Rotation extends Serializable {
-  def mark(timestamp: TimeStamp, offset: Long): Unit
-  def shouldRotate: Boolean
-  def rotate(): Unit
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
new file mode 100644
index 0000000..a18cce6
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.slf4j.Logger
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.hadoop.lib.rotation.Rotation
+import org.apache.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, HadoopCheckpointStoreWriter}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStore
+import org.apache.gearpump.util.LogUtil
+
+object HadoopCheckpointStore {
+  val LOG: Logger = LogUtil.getLogger(classOf[HadoopCheckpointStore])
+}
+
+/**
+ * Stores timestamp-checkpoint mapping to Hadoop-compatible filesystem.
+ *
+ * Store file layout:
+ * {{{
+ * timestamp1, index1,
+ * timestamp2, index2,
+ * ...
+ * timestampN, indexN
+ * }}}
+ */
+class HadoopCheckpointStore(
+    dir: Path,
+    fs: FileSystem,
+    hadoopConfig: Configuration,
+    rotation: Rotation)
+  extends CheckpointStore {
+
+  private[hadoop] var curTime = 0L
+  private[hadoop] var curStartTime = curTime
+  private[hadoop] var curFile: Option[String] = None
+  private[hadoop] var curWriter: Option[HadoopCheckpointStoreWriter] = None
+  // regex (checkpoints-$startTime-$endTime.store) for complete checkpoint file,
+  private val compRegex =
+    """checkpoints-(\d+)-(\d+).store""".r
+  // regex (checkpoints-$startTime.store) for temporary checkpoint file
+  private val tempRegex =
+    """checkpoints-(\d+).store""".r
+
+  /**
+   * Persists a pair of timestamp and checkpoint, which:
+   *
+   *   1. creates a temporary checkpoint file, checkpoints-\$startTime.store, if not exist
+   *   2. writes out (timestamp, checkpoint) and marks rotation
+   *   3. rotates checkpoint file if needed
+   *     a. renames temporary checkpoint file to checkpoints-\$startTime-\$endTime.store
+   *     b. closes current writer and reset
+   *     c. rotation rotates
+   */
+  override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = {
+    curTime = timestamp
+    if (curWriter.isEmpty) {
+      curStartTime = curTime
+      curFile = Some(s"checkpoints-$curStartTime.store")
+      curWriter = curFile.map(file =>
+        new HadoopCheckpointStoreWriter(new Path(dir, file), hadoopConfig))
+    }
+
+    curWriter.foreach { w =>
+      val offset = w.write(timestamp, checkpoint)
+      rotation.mark(timestamp, offset)
+    }
+
+    if (rotation.shouldRotate) {
+      curFile.foreach { f =>
+        fs.rename(new Path(dir, f), new Path(dir, s"checkpoints-$curStartTime-$curTime.store"))
+        curWriter.foreach(_.close())
+        curWriter = None
+      }
+      rotation.rotate
+    }
+  }
+
+  /**
+   * Recovers checkpoint given timestamp, which
+   *  {{{
+   *   1. returns None if no store exists
+   *   2. searches checkpoint stores for
+   *     a. complete store checkpoints-\$startTime-\$endTime.store
+   *         where startTime <= timestamp <= endTime
+   *     b. temporary store checkpoints-\$startTime.store
+   *         where startTime <= timestamp
+   *   3. renames store to checkpoints-\$startTime-\$endTime.store
+   *   4. deletes all stores whose name has a startTime larger than timestamp
+   *   5. looks for the checkpoint in the found store
+   *   }}}
+   */
+  override def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+    var checkpoint: Option[Array[Byte]] = None
+
+    if (fs.exists(dir)) {
+      var checkpointFile: Option[Path] = None
+      fs.listStatus(dir).map(_.getPath).foreach { file =>
+        val fileName = file.getName
+        fileName match {
+          case compRegex(start, end) =>
+            val startTime = start.toLong
+            val endTime = end.toLong
+            if (timestamp >= startTime && timestamp <= endTime) {
+              checkpointFile = Some(new Path(dir, fileName))
+            } else if (timestamp < startTime) {
+              fs.delete(file, true)
+            }
+          case tempRegex(start) =>
+            val startTime = start.toLong
+            if (timestamp >= startTime) {
+              val newFile = new Path(dir, s"checkpoints-$startTime-$timestamp.store")
+              fs.rename(new Path(dir, fileName), newFile)
+              checkpointFile = Some(newFile)
+            }
+        }
+      }
+
+      checkpointFile.foreach { file =>
+        val reader = new HadoopCheckpointStoreReader(file, hadoopConfig)
+
+        @annotation.tailrec
+        def read: Option[Array[Byte]] = {
+          if (reader.hasNext) {
+            val (time, bytes) = reader.next()
+            if (time == timestamp) {
+              Some(bytes)
+            } else {
+              read
+            }
+          } else {
+            None
+          }
+        }
+        checkpoint = read
+        reader.close()
+      }
+    }
+    checkpoint
+  }
+
+  override def close(): Unit = {
+    curWriter.foreach(_.close())
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
new file mode 100644
index 0000000..e5e0f13
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil
+import org.apache.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation}
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory}
+
+object HadoopCheckpointStoreFactory {
+  val VERSION = 1
+}
+
+class HadoopCheckpointStoreFactory(
+    dir: String,
+    @transient private var hadoopConfig: Configuration,
+    rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong))
+  extends CheckpointStoreFactory {
+  import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory._
+
+  private def writeObject(out: ObjectOutputStream): Unit = {
+    out.defaultWriteObject()
+    hadoopConfig.write(out)
+  }
+
+  private def readObject(in: ObjectInputStream): Unit = {
+    in.defaultReadObject()
+    hadoopConfig = new Configuration(false)
+    hadoopConfig.readFields(in)
+  }
+
+  override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore = {
+    import taskContext.{appId, taskId}
+    val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION",
+      s"app$appId-task${taskId.processorId}_${taskId.index}")
+    val fs = HadoopUtil.getFileSystemForPath(dirPath, hadoopConfig)
+    new HadoopCheckpointStore(dirPath, fs, hadoopConfig, rotation)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
new file mode 100644
index 0000000..bb56003
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.hadoop
+
+import java.text.SimpleDateFormat
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hdfs.HdfsConfiguration
+import org.apache.hadoop.io.SequenceFile
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil
+import org.apache.gearpump.streaming.hadoop.lib.format.{DefaultSequenceFormatter, OutputFormatter}
+import org.apache.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation}
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.{TaskContext, TaskId}
+
+class SequenceFileSink(
+    userConfig: UserConfig,
+    basePath: String,
+    rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong),
+    sequenceFormat: OutputFormatter = new DefaultSequenceFormatter)
+  extends DataSink{
+  @transient private lazy val configuration = new HdfsConfiguration()
+  private val dateFormat = new SimpleDateFormat("yyyy_MM_dd-HH-mm-ss")
+  private var writer: SequenceFile.Writer = null
+  private var taskId: TaskId = null
+  private var appName: String = null
+
+  /**
+   * Starts connection to data sink
+   *
+   * Invoked at onStart() method of [[org.apache.gearpump.streaming.task.Task]]
+   *
+   * @param context is the task context at runtime
+   */
+  override def open(context: TaskContext): Unit = {
+    HadoopUtil.login(userConfig, configuration)
+    this.appName = context.appName
+    this.taskId = context.taskId
+    this.writer = getNextWriter
+  }
+
+  /**
+   * Writes message into data sink
+   *
+   * Invoked at onNext() method of [[org.apache.gearpump.streaming.task.Task]]
+   * @param message wraps data to be written out
+   */
+  override def write(message: Message): Unit = {
+    val key = sequenceFormat.getKey(message)
+    val value = sequenceFormat.getValue(message)
+    if (writer == null) {
+      writer = getNextWriter
+    }
+    writer.append(key, value)
+    rotation.mark(message.timestamp, writer.getLength)
+    if (rotation.shouldRotate) {
+      closeWriter
+      this.writer = getNextWriter
+      rotation.rotate
+    }
+  }
+
+  /**
+   * Closes connection to data sink
+   *
+   * Invoked at onClose() method of [[org.apache.gearpump.streaming.task.Task]]
+   */
+  override def close(): Unit = {
+    closeWriter()
+  }
+
+  private def closeWriter(): Unit = {
+    Option(writer).foreach { w =>
+      w.hflush()
+      w.close()
+    }
+  }
+
+  private def getNextWriter: SequenceFile.Writer = {
+    SequenceFile.createWriter(
+      configuration,
+      SequenceFile.Writer.file(getNextFilePath),
+      SequenceFile.Writer.keyClass(sequenceFormat.getKeyClass),
+      SequenceFile.Writer.valueClass(sequenceFormat.getValueClass)
+    )
+  }
+
+  private def getNextFilePath: Path = {
+    val base = new Path(basePath, s"$appName-task${taskId.processorId}_${taskId.index}")
+    new Path(base, dateFormat.format(new java.util.Date))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
new file mode 100644
index 0000000..082e963
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop.lib
+
+import java.io.EOFException
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.gearpump.TimeStamp
+
+class HadoopCheckpointStoreReader(
+    path: Path,
+    hadoopConfig: Configuration)
+  extends Iterator[(TimeStamp, Array[Byte])] {
+
+  private val stream = HadoopUtil.getInputStream(path, hadoopConfig)
+  private var nextTimeStamp: Option[TimeStamp] = None
+  private var nextData: Option[Array[Byte]] = None
+
+  override def hasNext: Boolean = {
+    if (nextTimeStamp.isDefined) {
+      true
+    } else {
+      try {
+        nextTimeStamp = Some(stream.readLong())
+        val length = stream.readInt()
+        val buffer = new Array[Byte](length)
+        stream.readFully(buffer)
+        nextData = Some(buffer)
+        true
+      } catch {
+        case e: EOFException =>
+          close()
+          false
+        case e: Exception =>
+          close()
+          throw e
+      }
+    }
+  }
+
+  override def next(): (TimeStamp, Array[Byte]) = {
+    val timeAndData = for {
+      time <- nextTimeStamp
+      data <- nextData
+    } yield (time, data)
+    nextTimeStamp = None
+    nextData = None
+    timeAndData.get
+  }
+
+  def close(): Unit = {
+    stream.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
new file mode 100644
index 0000000..11c12c4
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop.lib
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.gearpump.TimeStamp
+
+class HadoopCheckpointStoreWriter(path: Path, hadoopConfig: Configuration) {
+  private lazy val stream = HadoopUtil.getOutputStream(path, hadoopConfig)
+
+  def write(timestamp: TimeStamp, data: Array[Byte]): Long = {
+    stream.writeLong(timestamp)
+    stream.writeInt(data.length)
+    stream.write(data)
+    stream.hflush()
+    stream.getPos()
+  }
+
+  def close(): Unit = {
+    stream.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopUtil.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopUtil.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopUtil.scala
new file mode 100644
index 0000000..935b52c
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopUtil.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.hadoop.lib
+
+import java.io.File
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.util.{Constants, FileUtils}
+
+private[hadoop] object HadoopUtil {
+
+  def getOutputStream(path: Path, hadoopConfig: Configuration): FSDataOutputStream = {
+    val dfs = getFileSystemForPath(path, hadoopConfig)
+    val stream: FSDataOutputStream = {
+      if (dfs.isFile(path)) {
+        dfs.append(path)
+      } else {
+        dfs.create(path)
+      }
+    }
+    stream
+  }
+
+  def getInputStream(path: Path, hadoopConfig: Configuration): FSDataInputStream = {
+    val dfs = getFileSystemForPath(path, hadoopConfig)
+    val stream = dfs.open(path)
+    stream
+  }
+
+  def getFileSystemForPath(path: Path, hadoopConfig: Configuration): FileSystem = {
+    // For local file systems, return the raw local file system, such calls to flush()
+    // actually flushes the stream.
+    val fs = path.getFileSystem(hadoopConfig)
+    fs match {
+      case localFs: LocalFileSystem => localFs.getRawFileSystem
+      case _ => fs
+    }
+  }
+
+  def login(userConfig: UserConfig, configuration: Configuration): Unit = {
+    if (UserGroupInformation.isSecurityEnabled) {
+      val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL)
+      val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE)
+      if (principal.isEmpty || keytabContent.isEmpty) {
+        val errorMsg = s"HDFS is security enabled, user should provide kerberos principal in " +
+          s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} " +
+          s"and keytab file in ${Constants.GEARPUMP_KEYTAB_FILE}"
+        throw new Exception(errorMsg)
+      }
+      val keytabFile = File.createTempFile("login", ".keytab")
+      FileUtils.writeByteArrayToFile(keytabFile, keytabContent.get)
+      keytabFile.setExecutable(false)
+      keytabFile.setWritable(false)
+      keytabFile.setReadable(true, true)
+
+      UserGroupInformation.setConfiguration(configuration)
+      UserGroupInformation.loginUserFromKeytab(principal.get, keytabFile.getAbsolutePath)
+      keytabFile.delete()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
new file mode 100644
index 0000000..318c071
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.hadoop.lib.format
+
+import org.apache.hadoop.io.{LongWritable, Text, Writable}
+
+import org.apache.gearpump.Message
+
+class DefaultSequenceFormatter extends OutputFormatter {
+  override def getKey(message: Message): Writable = new LongWritable(message.timestamp)
+
+  override def getValue(message: Message): Writable = new Text(message.msg.asInstanceOf[String])
+
+  override def getKeyClass: Class[_ <: Writable] = classOf[LongWritable]
+
+  override def getValueClass: Class[_ <: Writable] = classOf[Text]
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
new file mode 100644
index 0000000..435d0fc
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.hadoop.lib.format
+
+import org.apache.hadoop.io.Writable
+
+import org.apache.gearpump.Message
+
+trait OutputFormatter extends Serializable {
+  def getKeyClass: Class[_ <: Writable]
+
+  def getValueClass: Class[_ <: Writable]
+
+  def getKey(message: Message): Writable
+
+  def getValue(message: Message): Writable
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
new file mode 100644
index 0000000..72be9c3
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop.lib.rotation
+
+import org.apache.gearpump.TimeStamp
+
+case class FileSizeRotation(maxBytes: Long) extends Rotation {
+
+  private var bytesWritten = 0L
+
+  override def mark(timestamp: TimeStamp, offset: Long): Unit = {
+    bytesWritten = offset
+  }
+
+  override def shouldRotate: Boolean = bytesWritten >= maxBytes
+
+  override def rotate(): Unit = {
+    bytesWritten = 0L
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
new file mode 100644
index 0000000..cd8c04a
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop.lib.rotation
+
+import org.apache.gearpump.TimeStamp
+
+trait Rotation extends Serializable {
+  def mark(timestamp: TimeStamp, offset: Long): Unit
+  def shouldRotate: Boolean
+  def rotate(): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
deleted file mode 100644
index cc8a5f0..0000000
--- a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.hadoop.lib.HadoopUtil
-import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
-import io.gearpump.streaming.task.TaskId
-
-class HadoopCheckpointStoreIntegrationSpec
-  extends PropSpec with PropertyChecks with MockitoSugar with Matchers {
-
-  property("HadoopCheckpointStore should persist and recover checkpoints") {
-    val fileSizeGen = Gen.chooseNum[Int](100, 1000)
-    forAll(fileSizeGen) { (fileSize: Int) =>
-      val userConfig = UserConfig.empty
-      val taskContext = MockUtil.mockTaskContext
-      val hadoopConfig = new Configuration()
-
-      when(taskContext.appId).thenReturn(0)
-      when(taskContext.taskId).thenReturn(TaskId(0, 0))
-
-      val rootDirName = "test"
-      val rootDir = new Path(rootDirName + Path.SEPARATOR +
-        s"v${HadoopCheckpointStoreFactory.VERSION}")
-      val subDir = new Path(rootDir, "app0-task0_0")
-
-      val fs = HadoopUtil.getFileSystemForPath(rootDir, hadoopConfig)
-      fs.delete(rootDir, true)
-      fs.exists(rootDir) shouldBe false
-
-      val checkpointStoreFactory = new HadoopCheckpointStoreFactory(
-        rootDirName, hadoopConfig, new FileSizeRotation(fileSize))
-      val checkpointStore = checkpointStoreFactory.getCheckpointStore(userConfig, taskContext)
-
-      checkpointStore.persist(0L, Array(0.toByte))
-
-      val tempFile = new Path(subDir, "checkpoints-0.store")
-      fs.exists(tempFile) shouldBe true
-
-      checkpointStore.persist(1L, Array.fill(fileSize)(0.toByte))
-      fs.exists(tempFile) shouldBe false
-      fs.exists(new Path(subDir, "checkpoints-0-1.store")) shouldBe true
-
-      checkpointStore.persist(2L, Array(0.toByte))
-      val newTempFile = new Path(subDir, "checkpoints-2.store")
-      fs.exists(newTempFile) shouldBe true
-
-      for (i <- 0 to 2) {
-        val optCp = checkpointStore.recover(i)
-        optCp should not be empty
-      }
-      fs.exists(newTempFile) shouldBe false
-      fs.exists(new Path(subDir, "checkpoints-2-2.store")) shouldBe true
-
-      checkpointStore.close()
-      fs.delete(rootDir, true)
-      fs.close()
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
deleted file mode 100644
index 9b4057c..0000000
--- a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop.lib.rotation
-
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.TimeStamp
-
-class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers {
-
-  val timestampGen = Gen.chooseNum[Long](0L, 1000L)
-  val fileSizeGen = Gen.chooseNum[Long](1, Long.MaxValue)
-
-  property("FileSize rotation rotates on file size") {
-    forAll(timestampGen, fileSizeGen) { (timestamp: TimeStamp, fileSize: Long) =>
-      val rotation = new FileSizeRotation(fileSize)
-      rotation.shouldRotate shouldBe false
-      rotation.mark(timestamp, rotation.maxBytes / 2)
-      rotation.shouldRotate shouldBe false
-      rotation.mark(timestamp, rotation.maxBytes)
-      rotation.shouldRotate shouldBe true
-      rotation.rotate
-      rotation.shouldRotate shouldBe false
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
new file mode 100644
index 0000000..4fd8dc1
--- /dev/null
+++ b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil
+import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
+import org.apache.gearpump.streaming.task.TaskId
+
+class HadoopCheckpointStoreIntegrationSpec
+  extends PropSpec with PropertyChecks with MockitoSugar with Matchers {
+
+  property("HadoopCheckpointStore should persist and recover checkpoints") {
+    val fileSizeGen = Gen.chooseNum[Int](100, 1000)
+    forAll(fileSizeGen) { (fileSize: Int) =>
+      val userConfig = UserConfig.empty
+      val taskContext = MockUtil.mockTaskContext
+      val hadoopConfig = new Configuration()
+
+      when(taskContext.appId).thenReturn(0)
+      when(taskContext.taskId).thenReturn(TaskId(0, 0))
+
+      val rootDirName = "test"
+      val rootDir = new Path(rootDirName + Path.SEPARATOR +
+        s"v${HadoopCheckpointStoreFactory.VERSION}")
+      val subDir = new Path(rootDir, "app0-task0_0")
+
+      val fs = HadoopUtil.getFileSystemForPath(rootDir, hadoopConfig)
+      fs.delete(rootDir, true)
+      fs.exists(rootDir) shouldBe false
+
+      val checkpointStoreFactory = new HadoopCheckpointStoreFactory(
+        rootDirName, hadoopConfig, new FileSizeRotation(fileSize))
+      val checkpointStore = checkpointStoreFactory.getCheckpointStore(userConfig, taskContext)
+
+      checkpointStore.persist(0L, Array(0.toByte))
+
+      val tempFile = new Path(subDir, "checkpoints-0.store")
+      fs.exists(tempFile) shouldBe true
+
+      checkpointStore.persist(1L, Array.fill(fileSize)(0.toByte))
+      fs.exists(tempFile) shouldBe false
+      fs.exists(new Path(subDir, "checkpoints-0-1.store")) shouldBe true
+
+      checkpointStore.persist(2L, Array(0.toByte))
+      val newTempFile = new Path(subDir, "checkpoints-2.store")
+      fs.exists(newTempFile) shouldBe true
+
+      for (i <- 0 to 2) {
+        val optCp = checkpointStore.recover(i)
+        optCp should not be empty
+      }
+      fs.exists(newTempFile) shouldBe false
+      fs.exists(new Path(subDir, "checkpoints-2-2.store")) shouldBe true
+
+      checkpointStore.close()
+      fs.delete(rootDir, true)
+      fs.close()
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
new file mode 100644
index 0000000..4eab3c9
--- /dev/null
+++ b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop.lib.rotation
+
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.TimeStamp
+
+class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers {
+
+  val timestampGen = Gen.chooseNum[Long](0L, 1000L)
+  val fileSizeGen = Gen.chooseNum[Long](1, Long.MaxValue)
+
+  property("FileSize rotation rotates on file size") {
+    forAll(timestampGen, fileSizeGen) { (timestamp: TimeStamp, fileSize: Long) =>
+      val rotation = new FileSizeRotation(fileSize)
+      rotation.shouldRotate shouldBe false
+      rotation.mark(timestamp, rotation.maxBytes / 2)
+      rotation.shouldRotate shouldBe false
+      rotation.mark(timestamp, rotation.maxBytes)
+      rotation.shouldRotate shouldBe true
+      rotation.rotate
+      rotation.shouldRotate shouldBe false
+    }
+  }
+}