You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:30 UTC
[20/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
new file mode 100644
index 0000000..c0a4ee2
--- /dev/null
+++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.client
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream}
+import java.util.Random
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig}
+import org.apache.gearpump.experiments.yarn.glue.Records._
+import org.apache.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
+import org.apache.gearpump.util.FileUtils
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.util.Try
+class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+ implicit var system: ActorSystem = null
+
+ val rand = new Random()
+
+ private def randomArray(size: Int): Array[Byte] = {
+ val array = new Array[Byte](size)
+ rand.nextBytes(array)
+ array
+ }
+ val appId = ApplicationId.newInstance(0L, 0)
+
+ val akka = ConfigFactory.parseString(
+
+ """
+ |gearpump {
+ | yarn {
+ | client {
+ | package -path = "/user/gearpump/gearpump.zip"
+ | }
+ |
+ | applicationmaster {
+ | ## Memory of YarnAppMaster
+ | command = "$JAVA_HOME/bin/java -Xmx512m"
+ | memory = "512"
+ | vcores = "1"
+ | queue = "default"
+ | }
+ |
+ | master {
+ | ## Memory of master daemon
+ | command = "$JAVA_HOME/bin/java -Xmx512m"
+ | containers = "2"
+ | memory = "512"
+ | vcores = "1"
+ | }
+ |
+ | worker {
+ | ## memory of worker daemon
+ | command = "$JAVA_HOME/bin/java -Xmx512m"
+ | containers = "4"
+ | ## This also contains all memory for child executors.
+ | memory = "4096"
+ | vcores = "1"
+ | }
+ | services {
+ | enabled = true
+ | }
+ | }
+ |}
+ """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
+
+ override def beforeAll(): Unit = {
+ system = ActorSystem(getClass.getSimpleName, akka)
+ }
+
+ override def afterAll(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+
+ it should "reject non-zip files" in {
+ val yarnConfig = mock(classOf[YarnConfig])
+ val yarnClient = mock(classOf[YarnClient])
+ val fs = mock(classOf[FileSystem])
+ val appMasterResolver = mock(classOf[AppMasterResolver])
+
+ val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
+ val packagePath = "gearpump.zip2"
+ assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
+ }
+
+ it should "reject if we cannot find the package file on HDFS" in {
+ val yarnConfig = mock(classOf[YarnConfig])
+ val yarnClient = mock(classOf[YarnClient])
+ val fs = mock(classOf[FileSystem])
+ val appMasterResolver = mock(classOf[AppMasterResolver])
+
+ val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
+ val packagePath = "gearpump.zip"
+ when(fs.exists(anyString)).thenReturn(false)
+ assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
+ }
+
+ it should "throw when package exists on HDFS, but the file is corrupted" in {
+ val yarnConfig = mock(classOf[YarnConfig])
+ val yarnClient = mock(classOf[YarnClient])
+ val fs = mock(classOf[FileSystem])
+ val appMasterResolver = mock(classOf[AppMasterResolver])
+
+ val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
+ val packagePath = "gearpump.zip"
+ when(fs.exists(anyString)).thenReturn(true)
+
+ val content = new ByteArrayInputStream(randomArray(10))
+ when(fs.open(anyString)).thenReturn(content)
+ assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
+ content.close()
+ }
+
+ it should "throw when the HDFS package version is not consistent with local version" in {
+ val yarnConfig = mock(classOf[YarnConfig])
+ val yarnClient = mock(classOf[YarnClient])
+ val fs = mock(classOf[FileSystem])
+ val appMasterResolver = mock(classOf[AppMasterResolver])
+
+ val version = "gearpump-0.2"
+ val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
+ appMasterResolver, version)
+ val packagePath = "gearpump.zip"
+ when(fs.exists(anyString)).thenReturn(true)
+
+ val oldVesion = "gearpump-0.1"
+ when(fs.open(anyString)).thenReturn(zipInputStream(oldVesion))
+ assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
+ }
+
+ it should "upload config file to HDFS when submitting" in {
+ val yarnConfig = mock(classOf[YarnConfig])
+ val yarnClient = mock(classOf[YarnClient])
+ val fs = mock(classOf[FileSystem])
+ val appMasterResolver = mock(classOf[AppMasterResolver])
+
+ val version = "gearpump-0.2"
+ val launcher = new LaunchCluster(akka, yarnConfig, yarnClient,
+ fs, system, appMasterResolver, version)
+ val packagePath = "gearpump.zip"
+
+ val out = mock(classOf[OutputStream])
+ when(fs.exists(anyString)).thenReturn(true)
+ when(fs.create(anyString)).thenReturn(out)
+ when(fs.getHomeDirectory).thenReturn("/root")
+
+ when(fs.open(anyString)).thenReturn(zipInputStream(version))
+
+ val report = mock(classOf[ApplicationReport])
+ when(yarnClient.awaitApplication(any[ApplicationId], anyLong())).thenReturn(report)
+
+ when(report.getApplicationId).thenReturn(appId)
+ when(yarnClient.createApplication).thenReturn(appId)
+ assert(appId == launcher.submit("gearpump", packagePath))
+
+ // 3 Config files are uploaded to HDFS, one is akka.conf,
+ // one is yarn-site.xml, one is log4j.properties.
+ verify(fs, times(3)).create(anyString)
+ verify(out, times(3)).close()
+
+ // val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
+ // scalastyle:off line.size.limit
+ val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 -Dgearpump.binary-version-with-scala-version=gearpump-0.2 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster -conf /root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
+ // scalastyle:on line.size.limit
+ verify(yarnClient).submit("gearpump", appId, expectedCommand,
+ Resource.newInstance(512, 1), "default",
+ "gearpump.zip", "/root/.gearpump_application_0_0000/conf/")
+ }
+
+ it should "save active config from Gearpump cluster" in {
+ val yarnConfig = mock(classOf[YarnConfig])
+ val yarnClient = mock(classOf[YarnClient])
+ val fs = mock(classOf[FileSystem])
+ val appMasterResolver = mock(classOf[AppMasterResolver])
+ val appMaster = TestProbe()
+
+ val version = "gearpump-0.2"
+ val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
+ appMasterResolver, version)
+ val outputPath = java.io.File.createTempFile("LaunchClusterSpec", ".conf")
+
+ when(appMasterResolver.resolve(any[ApplicationId], anyInt)).thenReturn(appMaster.ref)
+ val fileFuture = launcher.saveConfig(appId, outputPath.getPath)
+ appMaster.expectMsgType[GetActiveConfig]
+ appMaster.reply(ActiveConfig(ConfigFactory.empty()))
+
+ import scala.concurrent.duration._
+ val file = Await.result(fileFuture, 30.seconds).asInstanceOf[java.io.File]
+
+ assert(!FileUtils.read(file).isEmpty)
+ file.delete()
+ }
+
+ private def zipInputStream(version: String): InputStream = {
+ val bytes = new ByteArrayOutputStream(1000)
+ val zipOut = new ZipOutputStream(bytes)
+
+ // Not available on BufferedOutputStream
+ zipOut.putNextEntry(new ZipEntry(s"$version/README.md"))
+ zipOut.write("README".getBytes())
+ // Not available on BufferedOutputStream
+ zipOut.closeEntry()
+ zipOut.close()
+ new ByteArrayInputStream(bytes.toByteArray)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/ManageClusterSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/ManageClusterSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/ManageClusterSpec.scala
new file mode 100644
index 0000000..01960ad
--- /dev/null
+++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/client/ManageClusterSpec.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.client
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
+import org.apache.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.main.ParseResult
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, ClusterInfo, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, Version}
+import org.apache.gearpump.experiments.yarn.client.ManageCluster._
+import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId
+import org.apache.gearpump.util.FileUtils
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+
+ implicit var system: ActorSystem = null
+
+ override def beforeAll(): Unit = {
+ system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+ }
+
+ override def afterAll(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+
+ it should "getConfig from remote Gearpump" in {
+ val appId = ApplicationId.newInstance(0L, 0)
+ val appMaster = TestProbe()
+ val manager = new ManageCluster(appId, appMaster.ref, system)
+
+ val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
+
+ val future = manager.command(GET_CONFIG, new ParseResult(Map("output" -> output.toString),
+ Array.empty[String]))
+ appMaster.expectMsgType[GetActiveConfig]
+ appMaster.reply(ActiveConfig(ConfigFactory.empty()))
+ import scala.concurrent.duration._
+ Await.result(future, 30.seconds)
+
+ val content = FileUtils.read(output)
+ assert(content.length > 0)
+ output.delete()
+ }
+
+ it should "addworker" in {
+ val appId = ApplicationId.newInstance(0L, 0)
+ val appMaster = TestProbe()
+ val manager = new ManageCluster(appId, appMaster.ref, system)
+
+ val future = manager.command(ADD_WORKER, new ParseResult(Map("count" -> 1.toString),
+ Array.empty[String]))
+ appMaster.expectMsg(AddWorker(1))
+ appMaster.reply(CommandResult(true))
+ import scala.concurrent.duration._
+ val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
+ assert(result.success)
+ }
+
+ it should "removeworker" in {
+ val appId = ApplicationId.newInstance(0L, 0)
+ val appMaster = TestProbe()
+ val manager = new ManageCluster(appId, appMaster.ref, system)
+
+ val future = manager.command(REMOVE_WORKER, new ParseResult(Map("container" -> "1"),
+ Array.empty[String]))
+ appMaster.expectMsg(RemoveWorker("1"))
+ appMaster.reply(CommandResult(true))
+ import scala.concurrent.duration._
+ val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
+ assert(result.success)
+ }
+
+ it should "get version" in {
+ val appId = ApplicationId.newInstance(0L, 0)
+ val appMaster = TestProbe()
+ val manager = new ManageCluster(appId, appMaster.ref, system)
+ val future = manager.command(VERSION, new ParseResult(Map("container" -> "1"),
+ Array.empty[String]))
+ appMaster.expectMsg(QueryVersion)
+ appMaster.reply(Version("version 0.1"))
+ import scala.concurrent.duration._
+ val result = Await.result(future, 30.seconds).asInstanceOf[Version]
+ assert(result.version == "version 0.1")
+ }
+
+ it should "get cluster info" in {
+ val appId = ApplicationId.newInstance(0L, 0)
+ val appMaster = TestProbe()
+ val manager = new ManageCluster(appId, appMaster.ref, system)
+
+ val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
+
+ val future = manager.command(QUERY, new ParseResult(Map.empty[String, String],
+ Array.empty[String]))
+ appMaster.expectMsg(QueryClusterInfo)
+ appMaster.reply(ClusterInfo(List("master"), List("worker")))
+ import scala.concurrent.duration._
+ val result = Await.result(future, 30.seconds).asInstanceOf[ClusterInfo]
+ assert(result.masters.sameElements(List("master")))
+ assert(result.workers.sameElements(List("worker")))
+ }
+
+ it should "kill the cluster" in {
+ val appId = ApplicationId.newInstance(0L, 0)
+ val appMaster = TestProbe()
+ val manager = new ManageCluster(appId, appMaster.ref, system)
+
+ val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
+
+ val future = manager.command(KILL, new ParseResult(Map("container" -> "1"),
+ Array.empty[String]))
+ appMaster.expectMsg(Kill)
+ appMaster.reply(CommandResult(true))
+ import scala.concurrent.duration._
+ val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
+ assert(result.success)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
deleted file mode 100644
index 194c9a5..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.slf4j.Logger
-
-import io.gearpump.TimeStamp
-import io.gearpump.streaming.hadoop.lib.rotation.Rotation
-import io.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, HadoopCheckpointStoreWriter}
-import io.gearpump.streaming.transaction.api.CheckpointStore
-import io.gearpump.util.LogUtil
-
-object HadoopCheckpointStore {
- val LOG: Logger = LogUtil.getLogger(classOf[HadoopCheckpointStore])
-}
-
-/**
- * Stores timestamp-checkpoint mapping to Hadoop-compatible filesystem.
- *
- * Store file layout:
- * {{{
- * timestamp1, index1,
- * timestamp2, index2,
- * ...
- * timestampN, indexN
- * }}}
- */
-class HadoopCheckpointStore(
- dir: Path,
- fs: FileSystem,
- hadoopConfig: Configuration,
- rotation: Rotation)
- extends CheckpointStore {
-
- private[hadoop] var curTime = 0L
- private[hadoop] var curStartTime = curTime
- private[hadoop] var curFile: Option[String] = None
- private[hadoop] var curWriter: Option[HadoopCheckpointStoreWriter] = None
- // regex (checkpoints-$startTime-$endTime.store) for complete checkpoint file,
- private val compRegex =
- """checkpoints-(\d+)-(\d+).store""".r
- // regex (checkpoints-$startTime.store) for temporary checkpoint file
- private val tempRegex =
- """checkpoints-(\d+).store""".r
-
- /**
- * Persists a pair of timestamp and checkpoint, which:
- *
- * 1. creates a temporary checkpoint file, checkpoints-\$startTime.store, if not exist
- * 2. writes out (timestamp, checkpoint) and marks rotation
- * 3. rotates checkpoint file if needed
- * a. renames temporary checkpoint file to checkpoints-\$startTime-\$endTime.store
- * b. closes current writer and reset
- * c. rotation rotates
- */
- override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = {
- curTime = timestamp
- if (curWriter.isEmpty) {
- curStartTime = curTime
- curFile = Some(s"checkpoints-$curStartTime.store")
- curWriter = curFile.map(file =>
- new HadoopCheckpointStoreWriter(new Path(dir, file), hadoopConfig))
- }
-
- curWriter.foreach { w =>
- val offset = w.write(timestamp, checkpoint)
- rotation.mark(timestamp, offset)
- }
-
- if (rotation.shouldRotate) {
- curFile.foreach { f =>
- fs.rename(new Path(dir, f), new Path(dir, s"checkpoints-$curStartTime-$curTime.store"))
- curWriter.foreach(_.close())
- curWriter = None
- }
- rotation.rotate
- }
- }
-
- /**
- * Recovers checkpoint given timestamp, which
- * {{{
- * 1. returns None if no store exists
- * 2. searches checkpoint stores for
- * a. complete store checkpoints-\$startTime-\$endTime.store
- * where startTime <= timestamp <= endTime
- * b. temporary store checkpoints-\$startTime.store
- * where startTime <= timestamp
- * 3. renames store to checkpoints-\$startTime-\$endTime.store
- * 4. deletes all stores whose name has a startTime larger than timestamp
- * 5. looks for the checkpoint in the found store
- * }}}
- */
- override def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
- var checkpoint: Option[Array[Byte]] = None
-
- if (fs.exists(dir)) {
- var checkpointFile: Option[Path] = None
- fs.listStatus(dir).map(_.getPath).foreach { file =>
- val fileName = file.getName
- fileName match {
- case compRegex(start, end) =>
- val startTime = start.toLong
- val endTime = end.toLong
- if (timestamp >= startTime && timestamp <= endTime) {
- checkpointFile = Some(new Path(dir, fileName))
- } else if (timestamp < startTime) {
- fs.delete(file, true)
- }
- case tempRegex(start) =>
- val startTime = start.toLong
- if (timestamp >= startTime) {
- val newFile = new Path(dir, s"checkpoints-$startTime-$timestamp.store")
- fs.rename(new Path(dir, fileName), newFile)
- checkpointFile = Some(newFile)
- }
- }
- }
-
- checkpointFile.foreach { file =>
- val reader = new HadoopCheckpointStoreReader(file, hadoopConfig)
-
- @annotation.tailrec
- def read: Option[Array[Byte]] = {
- if (reader.hasNext) {
- val (time, bytes) = reader.next()
- if (time == timestamp) {
- Some(bytes)
- } else {
- read
- }
- } else {
- None
- }
- }
- checkpoint = read
- reader.close()
- }
- }
- checkpoint
- }
-
- override def close(): Unit = {
- curWriter.foreach(_.close())
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
deleted file mode 100644
index 5a81ecd..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop
-
-import java.io.{ObjectInputStream, ObjectOutputStream}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.hadoop.lib.HadoopUtil
-import io.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation}
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory}
-
-object HadoopCheckpointStoreFactory {
- val VERSION = 1
-}
-
-class HadoopCheckpointStoreFactory(
- dir: String,
- @transient private var hadoopConfig: Configuration,
- rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong))
- extends CheckpointStoreFactory {
- import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory._
-
- private def writeObject(out: ObjectOutputStream): Unit = {
- out.defaultWriteObject()
- hadoopConfig.write(out)
- }
-
- private def readObject(in: ObjectInputStream): Unit = {
- in.defaultReadObject()
- hadoopConfig = new Configuration(false)
- hadoopConfig.readFields(in)
- }
-
- override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore = {
- import taskContext.{appId, taskId}
- val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION",
- s"app$appId-task${taskId.processorId}_${taskId.index}")
- val fs = HadoopUtil.getFileSystemForPath(dirPath, hadoopConfig)
- new HadoopCheckpointStore(dirPath, fs, hadoopConfig, rotation)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala
deleted file mode 100644
index a07dbbc..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.hadoop
-
-import java.text.SimpleDateFormat
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hdfs.HdfsConfiguration
-import org.apache.hadoop.io.SequenceFile
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.hadoop.lib.HadoopUtil
-import io.gearpump.streaming.hadoop.lib.format.{DefaultSequenceFormatter, OutputFormatter}
-import io.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation}
-import io.gearpump.streaming.sink.DataSink
-import io.gearpump.streaming.task.{TaskContext, TaskId}
-
-class SequenceFileSink(
- userConfig: UserConfig,
- basePath: String,
- rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong),
- sequenceFormat: OutputFormatter = new DefaultSequenceFormatter)
- extends DataSink{
- @transient private lazy val configuration = new HdfsConfiguration()
- private val dateFormat = new SimpleDateFormat("yyyy_MM_dd-HH-mm-ss")
- private var writer: SequenceFile.Writer = null
- private var taskId: TaskId = null
- private var appName: String = null
-
- /**
- * Starts connection to data sink
- *
- * Invoked at onStart() method of [[io.gearpump.streaming.task.Task]]
- *
- * @param context is the task context at runtime
- */
- override def open(context: TaskContext): Unit = {
- HadoopUtil.login(userConfig, configuration)
- this.appName = context.appName
- this.taskId = context.taskId
- this.writer = getNextWriter
- }
-
- /**
- * Writes message into data sink
- *
- * Invoked at onNext() method of [[io.gearpump.streaming.task.Task]]
- * @param message wraps data to be written out
- */
- override def write(message: Message): Unit = {
- val key = sequenceFormat.getKey(message)
- val value = sequenceFormat.getValue(message)
- if (writer == null) {
- writer = getNextWriter
- }
- writer.append(key, value)
- rotation.mark(message.timestamp, writer.getLength)
- if (rotation.shouldRotate) {
- closeWriter
- this.writer = getNextWriter
- rotation.rotate
- }
- }
-
- /**
- * Closes connection to data sink
- *
- * Invoked at onClose() method of [[io.gearpump.streaming.task.Task]]
- */
- override def close(): Unit = {
- closeWriter()
- }
-
- private def closeWriter(): Unit = {
- Option(writer).foreach { w =>
- w.hflush()
- w.close()
- }
- }
-
- private def getNextWriter: SequenceFile.Writer = {
- SequenceFile.createWriter(
- configuration,
- SequenceFile.Writer.file(getNextFilePath),
- SequenceFile.Writer.keyClass(sequenceFormat.getKeyClass),
- SequenceFile.Writer.valueClass(sequenceFormat.getValueClass)
- )
- }
-
- private def getNextFilePath: Path = {
- val base = new Path(basePath, s"$appName-task${taskId.processorId}_${taskId.index}")
- new Path(base, dateFormat.format(new java.util.Date))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
deleted file mode 100644
index 52acbac..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop.lib
-
-import java.io.EOFException
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-
-import io.gearpump.TimeStamp
-
-class HadoopCheckpointStoreReader(
- path: Path,
- hadoopConfig: Configuration)
- extends Iterator[(TimeStamp, Array[Byte])] {
-
- private val stream = HadoopUtil.getInputStream(path, hadoopConfig)
- private var nextTimeStamp: Option[TimeStamp] = None
- private var nextData: Option[Array[Byte]] = None
-
- override def hasNext: Boolean = {
- if (nextTimeStamp.isDefined) {
- true
- } else {
- try {
- nextTimeStamp = Some(stream.readLong())
- val length = stream.readInt()
- val buffer = new Array[Byte](length)
- stream.readFully(buffer)
- nextData = Some(buffer)
- true
- } catch {
- case e: EOFException =>
- close()
- false
- case e: Exception =>
- close()
- throw e
- }
- }
- }
-
- override def next(): (TimeStamp, Array[Byte]) = {
- val timeAndData = for {
- time <- nextTimeStamp
- data <- nextData
- } yield (time, data)
- nextTimeStamp = None
- nextData = None
- timeAndData.get
- }
-
- def close(): Unit = {
- stream.close()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
deleted file mode 100644
index 35f2f51..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop.lib
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-
-import io.gearpump.TimeStamp
-
-class HadoopCheckpointStoreWriter(path: Path, hadoopConfig: Configuration) {
- private lazy val stream = HadoopUtil.getOutputStream(path, hadoopConfig)
-
- def write(timestamp: TimeStamp, data: Array[Byte]): Long = {
- stream.writeLong(timestamp)
- stream.writeInt(data.length)
- stream.write(data)
- stream.hflush()
- stream.getPos()
- }
-
- def close(): Unit = {
- stream.close()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala
deleted file mode 100644
index eb579e4..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.hadoop.lib
-
-import java.io.File
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
-import org.apache.hadoop.security.UserGroupInformation
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.util.{Constants, FileUtils}
-
-private[hadoop] object HadoopUtil {
-
- def getOutputStream(path: Path, hadoopConfig: Configuration): FSDataOutputStream = {
- val dfs = getFileSystemForPath(path, hadoopConfig)
- val stream: FSDataOutputStream = {
- if (dfs.isFile(path)) {
- dfs.append(path)
- } else {
- dfs.create(path)
- }
- }
- stream
- }
-
- def getInputStream(path: Path, hadoopConfig: Configuration): FSDataInputStream = {
- val dfs = getFileSystemForPath(path, hadoopConfig)
- val stream = dfs.open(path)
- stream
- }
-
- def getFileSystemForPath(path: Path, hadoopConfig: Configuration): FileSystem = {
- // For local file systems, return the raw local file system, such calls to flush()
- // actually flushes the stream.
- val fs = path.getFileSystem(hadoopConfig)
- fs match {
- case localFs: LocalFileSystem => localFs.getRawFileSystem
- case _ => fs
- }
- }
-
- def login(userConfig: UserConfig, configuration: Configuration): Unit = {
- if (UserGroupInformation.isSecurityEnabled) {
- val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL)
- val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE)
- if (principal.isEmpty || keytabContent.isEmpty) {
- val errorMsg = s"HDFS is security enabled, user should provide kerberos principal in " +
- s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} " +
- s"and keytab file in ${Constants.GEARPUMP_KEYTAB_FILE}"
- throw new Exception(errorMsg)
- }
- val keytabFile = File.createTempFile("login", ".keytab")
- FileUtils.writeByteArrayToFile(keytabFile, keytabContent.get)
- keytabFile.setExecutable(false)
- keytabFile.setWritable(false)
- keytabFile.setReadable(true, true)
-
- UserGroupInformation.setConfiguration(configuration)
- UserGroupInformation.loginUserFromKeytab(principal.get, keytabFile.getAbsolutePath)
- keytabFile.delete()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
deleted file mode 100644
index d19e71f..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.hadoop.lib.format
-
-import org.apache.hadoop.io.{LongWritable, Text, Writable}
-
-import io.gearpump.Message
-
-class DefaultSequenceFormatter extends OutputFormatter {
- override def getKey(message: Message): Writable = new LongWritable(message.timestamp)
-
- override def getValue(message: Message): Writable = new Text(message.msg.asInstanceOf[String])
-
- override def getKeyClass: Class[_ <: Writable] = classOf[LongWritable]
-
- override def getValueClass: Class[_ <: Writable] = classOf[Text]
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
deleted file mode 100644
index fe8e52e..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.hadoop.lib.format
-
-import org.apache.hadoop.io.Writable
-
-import io.gearpump.Message
-
-trait OutputFormatter extends Serializable {
- def getKeyClass: Class[_ <: Writable]
-
- def getValueClass: Class[_ <: Writable]
-
- def getKey(message: Message): Writable
-
- def getValue(message: Message): Writable
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
deleted file mode 100644
index cd83ea5..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop.lib.rotation
-
-import io.gearpump.TimeStamp
-
-case class FileSizeRotation(maxBytes: Long) extends Rotation {
-
- private var bytesWritten = 0L
-
- override def mark(timestamp: TimeStamp, offset: Long): Unit = {
- bytesWritten = offset
- }
-
- override def shouldRotate: Boolean = bytesWritten >= maxBytes
-
- override def rotate(): Unit = {
- bytesWritten = 0L
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
deleted file mode 100644
index e28b222..0000000
--- a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop.lib.rotation
-
-import io.gearpump.TimeStamp
-
-trait Rotation extends Serializable {
- def mark(timestamp: TimeStamp, offset: Long): Unit
- def shouldRotate: Boolean
- def rotate(): Unit
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
new file mode 100644
index 0000000..a18cce6
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.slf4j.Logger
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.hadoop.lib.rotation.Rotation
+import org.apache.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, HadoopCheckpointStoreWriter}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStore
+import org.apache.gearpump.util.LogUtil
+
+object HadoopCheckpointStore {
+ val LOG: Logger = LogUtil.getLogger(classOf[HadoopCheckpointStore])
+}
+
+/**
+ * Stores timestamp-checkpoint mapping to Hadoop-compatible filesystem.
+ *
+ * Store file layout:
+ * {{{
+ * timestamp1, index1,
+ * timestamp2, index2,
+ * ...
+ * timestampN, indexN
+ * }}}
+ */
+class HadoopCheckpointStore(
+ dir: Path,
+ fs: FileSystem,
+ hadoopConfig: Configuration,
+ rotation: Rotation)
+ extends CheckpointStore {
+
+ private[hadoop] var curTime = 0L
+ private[hadoop] var curStartTime = curTime
+ private[hadoop] var curFile: Option[String] = None
+ private[hadoop] var curWriter: Option[HadoopCheckpointStoreWriter] = None
+ // regex (checkpoints-$startTime-$endTime.store) for complete checkpoint file,
+ private val compRegex =
+ """checkpoints-(\d+)-(\d+).store""".r
+ // regex (checkpoints-$startTime.store) for temporary checkpoint file
+ private val tempRegex =
+ """checkpoints-(\d+).store""".r
+
+ /**
+ * Persists a pair of timestamp and checkpoint, which:
+ *
+ * 1. creates a temporary checkpoint file, checkpoints-\$startTime.store, if not exist
+ * 2. writes out (timestamp, checkpoint) and marks rotation
+ * 3. rotates checkpoint file if needed
+ * a. renames temporary checkpoint file to checkpoints-\$startTime-\$endTime.store
+ * b. closes current writer and reset
+ * c. rotation rotates
+ */
+ override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = {
+ curTime = timestamp
+ if (curWriter.isEmpty) {
+ curStartTime = curTime
+ curFile = Some(s"checkpoints-$curStartTime.store")
+ curWriter = curFile.map(file =>
+ new HadoopCheckpointStoreWriter(new Path(dir, file), hadoopConfig))
+ }
+
+ curWriter.foreach { w =>
+ val offset = w.write(timestamp, checkpoint)
+ rotation.mark(timestamp, offset)
+ }
+
+ if (rotation.shouldRotate) {
+ curFile.foreach { f =>
+ fs.rename(new Path(dir, f), new Path(dir, s"checkpoints-$curStartTime-$curTime.store"))
+ curWriter.foreach(_.close())
+ curWriter = None
+ }
+ rotation.rotate
+ }
+ }
+
+ /**
+ * Recovers checkpoint given timestamp, which
+ * {{{
+ * 1. returns None if no store exists
+ * 2. searches checkpoint stores for
+ * a. complete store checkpoints-\$startTime-\$endTime.store
+ * where startTime <= timestamp <= endTime
+ * b. temporary store checkpoints-\$startTime.store
+ * where startTime <= timestamp
+ * 3. renames store to checkpoints-\$startTime-\$endTime.store
+ * 4. deletes all stores whose name has a startTime larger than timestamp
+ * 5. looks for the checkpoint in the found store
+ * }}}
+ */
+ override def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+ var checkpoint: Option[Array[Byte]] = None
+
+ if (fs.exists(dir)) {
+ var checkpointFile: Option[Path] = None
+ fs.listStatus(dir).map(_.getPath).foreach { file =>
+ val fileName = file.getName
+ fileName match {
+ case compRegex(start, end) =>
+ val startTime = start.toLong
+ val endTime = end.toLong
+ if (timestamp >= startTime && timestamp <= endTime) {
+ checkpointFile = Some(new Path(dir, fileName))
+ } else if (timestamp < startTime) {
+ fs.delete(file, true)
+ }
+ case tempRegex(start) =>
+ val startTime = start.toLong
+ if (timestamp >= startTime) {
+ val newFile = new Path(dir, s"checkpoints-$startTime-$timestamp.store")
+ fs.rename(new Path(dir, fileName), newFile)
+ checkpointFile = Some(newFile)
+ }
+ }
+ }
+
+ checkpointFile.foreach { file =>
+ val reader = new HadoopCheckpointStoreReader(file, hadoopConfig)
+
+ @annotation.tailrec
+ def read: Option[Array[Byte]] = {
+ if (reader.hasNext) {
+ val (time, bytes) = reader.next()
+ if (time == timestamp) {
+ Some(bytes)
+ } else {
+ read
+ }
+ } else {
+ None
+ }
+ }
+ checkpoint = read
+ reader.close()
+ }
+ }
+ checkpoint
+ }
+
+ override def close(): Unit = {
+ curWriter.foreach(_.close())
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
new file mode 100644
index 0000000..e5e0f13
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil
+import org.apache.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation}
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory}
+
+object HadoopCheckpointStoreFactory {
+ val VERSION = 1
+}
+
+class HadoopCheckpointStoreFactory(
+ dir: String,
+ @transient private var hadoopConfig: Configuration,
+ rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong))
+ extends CheckpointStoreFactory {
+ import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory._
+
+ private def writeObject(out: ObjectOutputStream): Unit = {
+ out.defaultWriteObject()
+ hadoopConfig.write(out)
+ }
+
+ private def readObject(in: ObjectInputStream): Unit = {
+ in.defaultReadObject()
+ hadoopConfig = new Configuration(false)
+ hadoopConfig.readFields(in)
+ }
+
+ override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore = {
+ import taskContext.{appId, taskId}
+ val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION",
+ s"app$appId-task${taskId.processorId}_${taskId.index}")
+ val fs = HadoopUtil.getFileSystemForPath(dirPath, hadoopConfig)
+ new HadoopCheckpointStore(dirPath, fs, hadoopConfig, rotation)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
new file mode 100644
index 0000000..bb56003
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.hadoop
+
+import java.text.SimpleDateFormat
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hdfs.HdfsConfiguration
+import org.apache.hadoop.io.SequenceFile
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil
+import org.apache.gearpump.streaming.hadoop.lib.format.{DefaultSequenceFormatter, OutputFormatter}
+import org.apache.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation}
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.{TaskContext, TaskId}
+
+class SequenceFileSink(
+ userConfig: UserConfig,
+ basePath: String,
+ rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong),
+ sequenceFormat: OutputFormatter = new DefaultSequenceFormatter)
+ extends DataSink{
+ @transient private lazy val configuration = new HdfsConfiguration()
+ private val dateFormat = new SimpleDateFormat("yyyy_MM_dd-HH-mm-ss")
+ private var writer: SequenceFile.Writer = null
+ private var taskId: TaskId = null
+ private var appName: String = null
+
+ /**
+ * Starts connection to data sink
+ *
+ * Invoked at onStart() method of [[org.apache.gearpump.streaming.task.Task]]
+ *
+ * @param context is the task context at runtime
+ */
+ override def open(context: TaskContext): Unit = {
+ HadoopUtil.login(userConfig, configuration)
+ this.appName = context.appName
+ this.taskId = context.taskId
+ this.writer = getNextWriter
+ }
+
+ /**
+ * Writes message into data sink
+ *
+ * Invoked at onNext() method of [[org.apache.gearpump.streaming.task.Task]]
+ * @param message wraps data to be written out
+ */
+ override def write(message: Message): Unit = {
+ val key = sequenceFormat.getKey(message)
+ val value = sequenceFormat.getValue(message)
+ if (writer == null) {
+ writer = getNextWriter
+ }
+ writer.append(key, value)
+ rotation.mark(message.timestamp, writer.getLength)
+ if (rotation.shouldRotate) {
+ closeWriter
+ this.writer = getNextWriter
+ rotation.rotate
+ }
+ }
+
+ /**
+ * Closes connection to data sink
+ *
+ * Invoked at onClose() method of [[org.apache.gearpump.streaming.task.Task]]
+ */
+ override def close(): Unit = {
+ closeWriter()
+ }
+
+ private def closeWriter(): Unit = {
+ Option(writer).foreach { w =>
+ w.hflush()
+ w.close()
+ }
+ }
+
+ private def getNextWriter: SequenceFile.Writer = {
+ SequenceFile.createWriter(
+ configuration,
+ SequenceFile.Writer.file(getNextFilePath),
+ SequenceFile.Writer.keyClass(sequenceFormat.getKeyClass),
+ SequenceFile.Writer.valueClass(sequenceFormat.getValueClass)
+ )
+ }
+
+ private def getNextFilePath: Path = {
+ val base = new Path(basePath, s"$appName-task${taskId.processorId}_${taskId.index}")
+ new Path(base, dateFormat.format(new java.util.Date))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
new file mode 100644
index 0000000..082e963
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop.lib
+
+import java.io.EOFException
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.gearpump.TimeStamp
+
+class HadoopCheckpointStoreReader(
+ path: Path,
+ hadoopConfig: Configuration)
+ extends Iterator[(TimeStamp, Array[Byte])] {
+
+ private val stream = HadoopUtil.getInputStream(path, hadoopConfig)
+ private var nextTimeStamp: Option[TimeStamp] = None
+ private var nextData: Option[Array[Byte]] = None
+
+ override def hasNext: Boolean = {
+ if (nextTimeStamp.isDefined) {
+ true
+ } else {
+ try {
+ nextTimeStamp = Some(stream.readLong())
+ val length = stream.readInt()
+ val buffer = new Array[Byte](length)
+ stream.readFully(buffer)
+ nextData = Some(buffer)
+ true
+ } catch {
+ case e: EOFException =>
+ close()
+ false
+ case e: Exception =>
+ close()
+ throw e
+ }
+ }
+ }
+
+ override def next(): (TimeStamp, Array[Byte]) = {
+ val timeAndData = for {
+ time <- nextTimeStamp
+ data <- nextData
+ } yield (time, data)
+ nextTimeStamp = None
+ nextData = None
+ timeAndData.get
+ }
+
+ def close(): Unit = {
+ stream.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
new file mode 100644
index 0000000..11c12c4
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop.lib
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.gearpump.TimeStamp
+
+class HadoopCheckpointStoreWriter(path: Path, hadoopConfig: Configuration) {
+ private lazy val stream = HadoopUtil.getOutputStream(path, hadoopConfig)
+
+ def write(timestamp: TimeStamp, data: Array[Byte]): Long = {
+ stream.writeLong(timestamp)
+ stream.writeInt(data.length)
+ stream.write(data)
+ stream.hflush()
+ stream.getPos()
+ }
+
+ def close(): Unit = {
+ stream.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopUtil.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopUtil.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopUtil.scala
new file mode 100644
index 0000000..935b52c
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopUtil.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.hadoop.lib
+
+import java.io.File
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.util.{Constants, FileUtils}
+
+private[hadoop] object HadoopUtil {
+
+ def getOutputStream(path: Path, hadoopConfig: Configuration): FSDataOutputStream = {
+ val dfs = getFileSystemForPath(path, hadoopConfig)
+ val stream: FSDataOutputStream = {
+ if (dfs.isFile(path)) {
+ dfs.append(path)
+ } else {
+ dfs.create(path)
+ }
+ }
+ stream
+ }
+
+ def getInputStream(path: Path, hadoopConfig: Configuration): FSDataInputStream = {
+ val dfs = getFileSystemForPath(path, hadoopConfig)
+ val stream = dfs.open(path)
+ stream
+ }
+
+ def getFileSystemForPath(path: Path, hadoopConfig: Configuration): FileSystem = {
+ // For local file systems, return the raw local file system, such calls to flush()
+ // actually flushes the stream.
+ val fs = path.getFileSystem(hadoopConfig)
+ fs match {
+ case localFs: LocalFileSystem => localFs.getRawFileSystem
+ case _ => fs
+ }
+ }
+
+ def login(userConfig: UserConfig, configuration: Configuration): Unit = {
+ if (UserGroupInformation.isSecurityEnabled) {
+ val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL)
+ val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE)
+ if (principal.isEmpty || keytabContent.isEmpty) {
+ val errorMsg = s"HDFS is security enabled, user should provide kerberos principal in " +
+ s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} " +
+ s"and keytab file in ${Constants.GEARPUMP_KEYTAB_FILE}"
+ throw new Exception(errorMsg)
+ }
+ val keytabFile = File.createTempFile("login", ".keytab")
+ FileUtils.writeByteArrayToFile(keytabFile, keytabContent.get)
+ keytabFile.setExecutable(false)
+ keytabFile.setWritable(false)
+ keytabFile.setReadable(true, true)
+
+ UserGroupInformation.setConfiguration(configuration)
+ UserGroupInformation.loginUserFromKeytab(principal.get, keytabFile.getAbsolutePath)
+ keytabFile.delete()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
new file mode 100644
index 0000000..318c071
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.hadoop.lib.format
+
+import org.apache.hadoop.io.{LongWritable, Text, Writable}
+
+import org.apache.gearpump.Message
+
+class DefaultSequenceFormatter extends OutputFormatter {
+ override def getKey(message: Message): Writable = new LongWritable(message.timestamp)
+
+ override def getValue(message: Message): Writable = new Text(message.msg.asInstanceOf[String])
+
+ override def getKeyClass: Class[_ <: Writable] = classOf[LongWritable]
+
+ override def getValueClass: Class[_ <: Writable] = classOf[Text]
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
new file mode 100644
index 0000000..435d0fc
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.hadoop.lib.format
+
+import org.apache.hadoop.io.Writable
+
+import org.apache.gearpump.Message
+
+trait OutputFormatter extends Serializable {
+ def getKeyClass: Class[_ <: Writable]
+
+ def getValueClass: Class[_ <: Writable]
+
+ def getKey(message: Message): Writable
+
+ def getValue(message: Message): Writable
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
new file mode 100644
index 0000000..72be9c3
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop.lib.rotation
+
+import org.apache.gearpump.TimeStamp
+
+case class FileSizeRotation(maxBytes: Long) extends Rotation {
+
+ private var bytesWritten = 0L
+
+ override def mark(timestamp: TimeStamp, offset: Long): Unit = {
+ bytesWritten = offset
+ }
+
+ override def shouldRotate: Boolean = bytesWritten >= maxBytes
+
+ override def rotate(): Unit = {
+ bytesWritten = 0L
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
new file mode 100644
index 0000000..cd8c04a
--- /dev/null
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop.lib.rotation
+
+import org.apache.gearpump.TimeStamp
+
+trait Rotation extends Serializable {
+ def mark(timestamp: TimeStamp, offset: Long): Unit
+ def shouldRotate: Boolean
+ def rotate(): Unit
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
deleted file mode 100644
index cc8a5f0..0000000
--- a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.hadoop.lib.HadoopUtil
-import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
-import io.gearpump.streaming.task.TaskId
-
-class HadoopCheckpointStoreIntegrationSpec
- extends PropSpec with PropertyChecks with MockitoSugar with Matchers {
-
- property("HadoopCheckpointStore should persist and recover checkpoints") {
- val fileSizeGen = Gen.chooseNum[Int](100, 1000)
- forAll(fileSizeGen) { (fileSize: Int) =>
- val userConfig = UserConfig.empty
- val taskContext = MockUtil.mockTaskContext
- val hadoopConfig = new Configuration()
-
- when(taskContext.appId).thenReturn(0)
- when(taskContext.taskId).thenReturn(TaskId(0, 0))
-
- val rootDirName = "test"
- val rootDir = new Path(rootDirName + Path.SEPARATOR +
- s"v${HadoopCheckpointStoreFactory.VERSION}")
- val subDir = new Path(rootDir, "app0-task0_0")
-
- val fs = HadoopUtil.getFileSystemForPath(rootDir, hadoopConfig)
- fs.delete(rootDir, true)
- fs.exists(rootDir) shouldBe false
-
- val checkpointStoreFactory = new HadoopCheckpointStoreFactory(
- rootDirName, hadoopConfig, new FileSizeRotation(fileSize))
- val checkpointStore = checkpointStoreFactory.getCheckpointStore(userConfig, taskContext)
-
- checkpointStore.persist(0L, Array(0.toByte))
-
- val tempFile = new Path(subDir, "checkpoints-0.store")
- fs.exists(tempFile) shouldBe true
-
- checkpointStore.persist(1L, Array.fill(fileSize)(0.toByte))
- fs.exists(tempFile) shouldBe false
- fs.exists(new Path(subDir, "checkpoints-0-1.store")) shouldBe true
-
- checkpointStore.persist(2L, Array(0.toByte))
- val newTempFile = new Path(subDir, "checkpoints-2.store")
- fs.exists(newTempFile) shouldBe true
-
- for (i <- 0 to 2) {
- val optCp = checkpointStore.recover(i)
- optCp should not be empty
- }
- fs.exists(newTempFile) shouldBe false
- fs.exists(new Path(subDir, "checkpoints-2-2.store")) shouldBe true
-
- checkpointStore.close()
- fs.delete(rootDir, true)
- fs.close()
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
deleted file mode 100644
index 9b4057c..0000000
--- a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.hadoop.lib.rotation
-
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.TimeStamp
-
-class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers {
-
- val timestampGen = Gen.chooseNum[Long](0L, 1000L)
- val fileSizeGen = Gen.chooseNum[Long](1, Long.MaxValue)
-
- property("FileSize rotation rotates on file size") {
- forAll(timestampGen, fileSizeGen) { (timestamp: TimeStamp, fileSize: Long) =>
- val rotation = new FileSizeRotation(fileSize)
- rotation.shouldRotate shouldBe false
- rotation.mark(timestamp, rotation.maxBytes / 2)
- rotation.shouldRotate shouldBe false
- rotation.mark(timestamp, rotation.maxBytes)
- rotation.shouldRotate shouldBe true
- rotation.rotate
- rotation.shouldRotate shouldBe false
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
new file mode 100644
index 0000000..4fd8dc1
--- /dev/null
+++ b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil
+import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
+import org.apache.gearpump.streaming.task.TaskId
+
+class HadoopCheckpointStoreIntegrationSpec
+ extends PropSpec with PropertyChecks with MockitoSugar with Matchers {
+
+ property("HadoopCheckpointStore should persist and recover checkpoints") {
+ val fileSizeGen = Gen.chooseNum[Int](100, 1000)
+ forAll(fileSizeGen) { (fileSize: Int) =>
+ val userConfig = UserConfig.empty
+ val taskContext = MockUtil.mockTaskContext
+ val hadoopConfig = new Configuration()
+
+ when(taskContext.appId).thenReturn(0)
+ when(taskContext.taskId).thenReturn(TaskId(0, 0))
+
+ val rootDirName = "test"
+ val rootDir = new Path(rootDirName + Path.SEPARATOR +
+ s"v${HadoopCheckpointStoreFactory.VERSION}")
+ val subDir = new Path(rootDir, "app0-task0_0")
+
+ val fs = HadoopUtil.getFileSystemForPath(rootDir, hadoopConfig)
+ fs.delete(rootDir, true)
+ fs.exists(rootDir) shouldBe false
+
+ val checkpointStoreFactory = new HadoopCheckpointStoreFactory(
+ rootDirName, hadoopConfig, new FileSizeRotation(fileSize))
+ val checkpointStore = checkpointStoreFactory.getCheckpointStore(userConfig, taskContext)
+
+ checkpointStore.persist(0L, Array(0.toByte))
+
+ val tempFile = new Path(subDir, "checkpoints-0.store")
+ fs.exists(tempFile) shouldBe true
+
+ checkpointStore.persist(1L, Array.fill(fileSize)(0.toByte))
+ fs.exists(tempFile) shouldBe false
+ fs.exists(new Path(subDir, "checkpoints-0-1.store")) shouldBe true
+
+ checkpointStore.persist(2L, Array(0.toByte))
+ val newTempFile = new Path(subDir, "checkpoints-2.store")
+ fs.exists(newTempFile) shouldBe true
+
+ for (i <- 0 to 2) {
+ val optCp = checkpointStore.recover(i)
+ optCp should not be empty
+ }
+ fs.exists(newTempFile) shouldBe false
+ fs.exists(new Path(subDir, "checkpoints-2-2.store")) shouldBe true
+
+ checkpointStore.close()
+ fs.delete(rootDir, true)
+ fs.close()
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
new file mode 100644
index 0000000..4eab3c9
--- /dev/null
+++ b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.hadoop.lib.rotation
+
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.TimeStamp
+
+class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers {
+
+ val timestampGen = Gen.chooseNum[Long](0L, 1000L)
+ val fileSizeGen = Gen.chooseNum[Long](1, Long.MaxValue)
+
+ property("FileSize rotation rotates on file size") {
+ forAll(timestampGen, fileSizeGen) { (timestamp: TimeStamp, fileSize: Long) =>
+ val rotation = new FileSizeRotation(fileSize)
+ rotation.shouldRotate shouldBe false
+ rotation.mark(timestamp, rotation.maxBytes / 2)
+ rotation.shouldRotate shouldBe false
+ rotation.mark(timestamp, rotation.maxBytes)
+ rotation.shouldRotate shouldBe true
+ rotation.rotate
+ rotation.shouldRotate shouldBe false
+ }
+ }
+}