You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ch...@apache.org on 2019/04/27 16:57:36 UTC
[hama] branch componentization updated: 1. Upgrade zio to 1.0-RC4.
2. Switch Communicator to use zio instead.
This is an automated email from the ASF dual-hosted git repository.
chl501 pushed a commit to branch componentization
in repository https://gitbox.apache.org/repos/asf/hama.git
The following commit(s) were added to refs/heads/componentization by this push:
new 7fceb26 1. Upgrade zio to 1.0-RC4. 2. Switch Communicator to use zio instead.
7fceb26 is described below
commit 7fceb26be045ffa62aa5e440deac23da5c423899
Author: Chiahung Lin <ch...@apache.org>
AuthorDate: Sat Apr 27 18:56:55 2019 +0200
1. Upgrade zio to 1.0-RC4.
2. Switch Communicator to use zio instead.
---
build.sbt | 2 +-
.../main/scala/org/apache/hama/conf/Setting.scala | 3 -
.../org/apache/hama/membership/Communicator.scala | 177 +++------------------
.../scala/org/apache/hama/membership/Driver.scala | 43 -----
.../apache/hama/membership/CommunicatorSpec.scala | 135 ----------------
5 files changed, 21 insertions(+), 339 deletions(-)
diff --git a/build.sbt b/build.sbt
index 60002b8..1f27989 100644
--- a/build.sbt
+++ b/build.sbt
@@ -22,7 +22,7 @@ lazy val common = Seq (
scalacOptions := Seq ("-deprecation", "-Ypartial-unification"),
libraryDependencies ++= Seq (
"org.typelevel" %% "cats-core" % "1.6.0",
- "org.scalaz" %% "scalaz-zio" % "0.3.1",
+ "org.scalaz" %% "scalaz-zio" % "1.0-RC4",
"org.scalatest" %% "scalatest" % "3.0.5" % "test"
)
)
diff --git a/commons.v2/src/main/scala/org/apache/hama/conf/Setting.scala b/commons.v2/src/main/scala/org/apache/hama/conf/Setting.scala
index 1c7414f..97f092e 100644
--- a/commons.v2/src/main/scala/org/apache/hama/conf/Setting.scala
+++ b/commons.v2/src/main/scala/org/apache/hama/conf/Setting.scala
@@ -50,7 +50,4 @@ object Setting {
}
- // TODO: hadoop configuration to
- // def from(configuration: Configuration): Setting = ???
-
}
diff --git a/membership/src/main/scala/org/apache/hama/membership/Communicator.scala b/membership/src/main/scala/org/apache/hama/membership/Communicator.scala
index 95ea10f..26f0117 100644
--- a/membership/src/main/scala/org/apache/hama/membership/Communicator.scala
+++ b/membership/src/main/scala/org/apache/hama/membership/Communicator.scala
@@ -22,6 +22,8 @@ import io.aeron.FragmentAssembler
import io.aeron.Publication
import io.aeron.Publication._
import io.aeron.Subscription
+import io.aeron.driver.MediaDriver
+import io.aeron.driver.ThreadingMode._
import io.aeron.logbuffer.Header
import java.util.concurrent.atomic.AtomicBoolean
import org.agrona.BitUtil._
@@ -37,170 +39,31 @@ import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import scala.util.Either
-object Communicator {
-
- lazy val defaultAeronContext = new Aeron.Context()
-
- lazy val defaultChannel = Channel()
-
- final case object BackPressured extends RuntimeException (
- "Failure due to back pressure!"
- )
-
- final case object NotConnected extends RuntimeException (
- s"Failure because publisher is not connected to subscriber!"
- )
-
- final case object AdminAction extends RuntimeException (
- "Failure because of administration action!"
- )
-
- final case object Closed extends RuntimeException (
- "Failure because publication is closed!"
- )
-
- final case object MaxPositionExceeded extends RuntimeException (
- "Failure due to publication reaching max position!"
- )
-
- final case object UnknownReason extends RuntimeException (
- "Failure due to unknown reason!"
- )
+import scalaz.zio._
- final case class Channel(host: String = localhost, port: Int = 12345) {
-
- require(!host.isBlank, "Channel's host value is not presented!")
-
- require(port.isValid, s"Invalid Channel port value: $port!")
-
- protected[membership] val channel =
- (host: String, port: Int) => s"aeron:udp?endpoint=$host:$port"
+trait Communicator
+object Communicator {
- override def toString(): String = channel(host, port)
+ def driver: Task[MediaDriver] = ZIO.effect {
+ val context = new MediaDriver.Context().
+ threadingMode(DEDICATED).
+ conductorIdleStrategy(new BusySpinIdleStrategy).
+ receiverIdleStrategy(new BusySpinIdleStrategy).
+ senderIdleStrategy(new BusySpinIdleStrategy)
+ MediaDriver.launch(context)
}
- final case class Publisher(publication: Publication) extends Logging {
-
- require(null != publication, "Aeron Publication is not presented!")
-
- def isConnected (
- implicit _deadline: Long = (System.nanoTime + 3.seconds.toNanos),
- countLimit: Int = 3, sleep: () => Unit = { () => Thread.sleep(1) }
- ): Boolean = {
- @tailrec
- def _isConnected(pub: Publication, times: Int = 0): Boolean = {
- val connected = pub.isConnected
- if(countLimit > times && (false == connected)) {
- if(System.nanoTime >= _deadline) false else {
- sleep()
- _isConnected(pub, times + 1)
- }
- } else connected
- }
- _isConnected(publication)
- }
-
- def send (
- messageBytes: Array[Byte],
- bufferCapacity: Int = 512,
- boundaryAlighment: Int = CACHE_LINE_LENGTH,
- deadline: Long = (System.nanoTime + 3.seconds.toNanos),
- sleep: () => Unit = { () => Thread.sleep(1) }
- ): Either[Throwable, Publisher] = {
- // TODO: replace with isConnected
- while(!publication.isConnected()) {
- if(System.nanoTime >= deadline) {
- return Left(new RuntimeException (
- s"Can't connect to Publication(${publication.channel}, ${publication.streamId})"
- ))
- }
- sleep()
- }
- val buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned (
- bufferCapacity, boundaryAlighment
- ))
- buffer.putBytes(0, messageBytes)
- val result = publication.offer(buffer, 0, messageBytes.length)
- log.debug(s"Publication.offer() returns result $result")
- result match {
- case res if res < 0L && BACK_PRESSURED == res => Left(BackPressured)
- case res if res < 0L && NOT_CONNECTED == res => Left(NotConnected)
- case res if res < 0L && ADMIN_ACTION == res => Left(AdminAction)
- case res if res < 0L && CLOSED == res => Left(Closed)
- case res if res < 0L && MAX_POSITION_EXCEEDED == res => Left(MaxPositionExceeded)
- case res if res < 0L => Left(UnknownReason)
- case _ => Right(this)
- }
- }
-
- def run[O](f: Publication => O): O = f(publication)
-
- def close() = try {} finally { publication.close }
-
+ def aeron: Task[Aeron] = ZIO.effect {
+ val context = new Aeron.Context()
+ Aeron.connect(context)
}
- final case class Subscriber(subscription: Subscription) extends Logging {
-
- require(null != subscription, "Aeron Subscription is not presented!")
-
- def receive(f: (DirectBuffer, Int, Int, Header) => Boolean) {
- var continuous = true
- val handler = new FragmentAssembler({
- (buffer: DirectBuffer, offset: Int, length: Int, header: Header) => {
- val shouldContinuous = f(buffer, offset, length, header)
- continuous = shouldContinuous
- }
- })
- val idleStrategy = new BusySpinIdleStrategy()
- while(continuous) {
- val read = subscription.poll(handler, 10)
- idleStrategy.idle(read)
- }
- }
-
- def run[O](f: Subscription => O): O = f(subscription)
-
- def close = try {} finally { subscription.close }
-
+ def subscriber(aeron: Task[Aeron]): Task[Subscription] = aeron.flatMap { a =>
+ Task.effect(a.addSubscription("aeron:udp?endpoint=localhost:12345", 10))
}
- def create(ctx: Aeron.Context = defaultAeronContext) =
- Communicator(aeron = Aeron.connect(ctx))
-
-}
-
-/**
- * Aeron client should only be created one per vm for different channels.
- * Otherwise vm may crash with error
- * {{{
- * org.agrona.concurrent.status.UnsafeBufferPosition.getVolatile() SIGSEGV
- * }}}
- */
-final case class Communicator (
- aeron: Aeron,
- publication: Option[Communicator.Publisher] = None,
- subscription: Option[Communicator.Subscriber] = None
-) extends Logging {
-
- import Communicator._
-
- require(null != aeron, "Aeron instance is not presented!")
-
- def withPublication (
- channel: Channel = defaultChannel, streamId: Int = 10
- ) = this.copy(publication = Option (
- Publisher(aeron.addPublication(channel.toString, streamId))
- ))
-
- def withSubscription (
- channel: Channel = defaultChannel, streamId: Int = 10
- ) = this.copy(subscription = Option (
- Subscriber(aeron.addSubscription(channel.toString, streamId))
- ))
-
- def close() = try {} finally {
- subscription.map(_.close)
- publication.map(_.close)
- if(!aeron.isClosed) aeron.close
+ def publisher(aeron: Task[Aeron]): Task[Publication] = aeron.flatMap { a =>
+ Task.effect(a.addPublication("aeron:udp?endpoint=localhost:12344", 10))
}
+
}
diff --git a/membership/src/main/scala/org/apache/hama/membership/Driver.scala b/membership/src/main/scala/org/apache/hama/membership/Driver.scala
deleted file mode 100644
index 385065c..0000000
--- a/membership/src/main/scala/org/apache/hama/membership/Driver.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.membership
-
-import io.aeron.driver.MediaDriver
-import io.aeron.driver.ThreadingMode._
-import org.agrona.concurrent.BusySpinIdleStrategy
-import scala.util.Try
-
-object Driver {
-
- lazy val defaultMediaDriverContext = new MediaDriver.Context().
- threadingMode(DEDICATED).
- conductorIdleStrategy(new BusySpinIdleStrategy).
- receiverIdleStrategy(new BusySpinIdleStrategy).
- senderIdleStrategy(new BusySpinIdleStrategy)
-
- def create(ctx: MediaDriver.Context = defaultMediaDriverContext) =
- new Driver(ctx)
-
-}
-
-final class Driver(ctx: MediaDriver.Context) {
-
- def start() = Try(MediaDriver.launch(ctx)).toEither
-
-}
-
diff --git a/membership/src/test/scala/org/apache/hama/membership/CommunicatorSpec.scala b/membership/src/test/scala/org/apache/hama/membership/CommunicatorSpec.scala
deleted file mode 100644
index 5e4ac54..0000000
--- a/membership/src/test/scala/org/apache/hama/membership/CommunicatorSpec.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.membership
-
-import io.aeron.driver.MediaDriver
-import org.agrona.concurrent.ShutdownSignalBarrier
-import org.apache.hama.logging.Logging
-import org.scalatest._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Either
-import scalaz.zio._
-
-class CommunicatorSpec extends FlatSpec with Matchers with Logging {
-
- import Communicator._
-
- "Communicator with zio.Fiber" should "send/ receive message." in {
-
- val barrier = new ShutdownSignalBarrier
- val driverIO = IO.sync{
- val driver = Driver.create().start
- barrier.await
- driver
- }
-
- lazy val communicator = Communicator.create()
-
- def subscriptionIO(comm: Communicator) = IO.sync {
- comm.withSubscription().subscription.map { sub => sub.receive {
- (buffer, offset, length, header) =>
- val data = new Array[Byte](length)
- buffer.getBytes(offset, data)
- val hello = new String(data)
- log.info(s"Subscription handler receives message: $hello ")
- assert("hello".equals(hello))
- false // stop receive message while loop
- }}
- comm
- }
-
- def publicationIO(comm: Communicator) = IO.sync {
- comm.withPublication().publication.map { pub => pub.send (
- messageBytes = "hello".getBytes
- )}
- comm
- }
-
- val handlers = for {
- dio <- driverIO.fork
- sio <- subscriptionIO(communicator).fork
- pio <- publicationIO(communicator).fork
- cio <- IO.sync { barrier.signal; IO.unit }.fork
- _ <- cio.join
- p <- pio.join
- s <- sio.join
- d <- dio.join
- } yield (p, s, d)
-
- new RTS {}.unsafeRun {
- handlers.run
- }.map { case (p, s, d) =>
- p.close
- s.close
- d.map(_.close)
- }
- log.info("Done!")
- }
-
-/*
- "Communicator" should "send/ receive message." in {
- val barrier = new ShutdownSignalBarrier
- val f1 = Future {
- val driver = Driver.create().start
- barrier.await
- driver
- }
- val communicator = Communicator.create()
- val f2 = Future {
- communicator.withSubscription().subscription.map { sub => sub.receive {
- (buffer, offset, length, header) =>
- val data = new Array[Byte](length)
- buffer.getBytes(offset, data)
- val hello = new String(data)
- log.info(s"Receive string $hello")
- assert("hello".equals(hello))
- false
- }}
- communicator
- }
- val f3 = Future {
- communicator.withPublication().publication.map { pub => pub.send (
- messageBytes = "hello".getBytes
- )}
- communicator
- }
- Thread.sleep(3 * 1000)
- barrier.signal
- f2.onComplete {
- case Success(communicator) => communicator.close
- case Failure(ex) => throw ex
- }
- f3.onComplete {
- case Success(communicator) => communicator.close
- case Failure(ex) => throw ex
- }
- f1.onComplete {
- case Success(driver) => driver match {
- case Left(ex) => throw ex
- case Right(d) => d.close
- }
- case Failure(ex) => throw ex
- }
- }
-*/
-
-}
-