You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ch...@apache.org on 2019/04/27 16:57:36 UTC

[hama] branch componentization updated: 1. Upgrade zio to 1.0-RC4. 2. Switch Communicator to use zio instead.

This is an automated email from the ASF dual-hosted git repository.

chl501 pushed a commit to branch componentization
in repository https://gitbox.apache.org/repos/asf/hama.git


The following commit(s) were added to refs/heads/componentization by this push:
     new 7fceb26  1. Upgrade zio to 1.0-RC4. 2. Switch Communicator to use zio instead.
7fceb26 is described below

commit 7fceb26be045ffa62aa5e440deac23da5c423899
Author: Chiahung Lin <ch...@apache.org>
AuthorDate: Sat Apr 27 18:56:55 2019 +0200

    1. Upgrade zio to 1.0-RC4.
    2. Switch Communicator to use zio instead.
---
 build.sbt                                          |   2 +-
 .../main/scala/org/apache/hama/conf/Setting.scala  |   3 -
 .../org/apache/hama/membership/Communicator.scala  | 177 +++------------------
 .../scala/org/apache/hama/membership/Driver.scala  |  43 -----
 .../apache/hama/membership/CommunicatorSpec.scala  | 135 ----------------
 5 files changed, 21 insertions(+), 339 deletions(-)

diff --git a/build.sbt b/build.sbt
index 60002b8..1f27989 100644
--- a/build.sbt
+++ b/build.sbt
@@ -22,7 +22,7 @@ lazy val common = Seq (
   scalacOptions := Seq ("-deprecation", "-Ypartial-unification"), 
   libraryDependencies ++= Seq (
     "org.typelevel" %% "cats-core" % "1.6.0",
-    "org.scalaz" %% "scalaz-zio" % "0.3.1",
+    "org.scalaz" %% "scalaz-zio" % "1.0-RC4",
     "org.scalatest" %% "scalatest" % "3.0.5" % "test"
   )
 )
diff --git a/commons.v2/src/main/scala/org/apache/hama/conf/Setting.scala b/commons.v2/src/main/scala/org/apache/hama/conf/Setting.scala
index 1c7414f..97f092e 100644
--- a/commons.v2/src/main/scala/org/apache/hama/conf/Setting.scala
+++ b/commons.v2/src/main/scala/org/apache/hama/conf/Setting.scala
@@ -50,7 +50,4 @@ object Setting {
 
   }
 
-  // TODO: hadoop configuration to  
-  // def from(configuration: Configuration): Setting = ???
-
 }
diff --git a/membership/src/main/scala/org/apache/hama/membership/Communicator.scala b/membership/src/main/scala/org/apache/hama/membership/Communicator.scala
index 95ea10f..26f0117 100644
--- a/membership/src/main/scala/org/apache/hama/membership/Communicator.scala
+++ b/membership/src/main/scala/org/apache/hama/membership/Communicator.scala
@@ -22,6 +22,8 @@ import io.aeron.FragmentAssembler
 import io.aeron.Publication
 import io.aeron.Publication._
 import io.aeron.Subscription
+import io.aeron.driver.MediaDriver
+import io.aeron.driver.ThreadingMode._
 import io.aeron.logbuffer.Header
 import java.util.concurrent.atomic.AtomicBoolean
 import org.agrona.BitUtil._
@@ -37,170 +39,31 @@ import scala.concurrent.duration.Duration
 import scala.concurrent.duration.DurationInt
 import scala.util.Either
 
-object Communicator {
-
-  lazy val defaultAeronContext = new Aeron.Context()
-
-  lazy val defaultChannel = Channel()
-
-  final case object BackPressured extends RuntimeException (
-    "Failure due to back pressure!"
-  )
-
-  final case object NotConnected extends RuntimeException (
-    s"Failure because publisher is not connected to subscriber!"
-  )
-
-  final case object AdminAction extends RuntimeException (
-    "Failure because of administration action!"
-  )
-
-  final case object Closed extends RuntimeException (
-    "Failure because publication is closed!"
-  )
-
-  final case object MaxPositionExceeded extends RuntimeException (
-    "Failure due to publication reaching max position!"
-  )
-
-  final case object UnknownReason extends RuntimeException (
-    "Failure due to unknown reason!"
-  )
+import scalaz.zio._
 
-  final case class Channel(host: String = localhost, port: Int = 12345) {
-
-    require(!host.isBlank, "Channel's host value is not presented!")
-
-    require(port.isValid, s"Invalid Channel port value: $port!")
-
-    protected[membership] val channel = 
-      (host: String, port: Int) => s"aeron:udp?endpoint=$host:$port"
+trait Communicator
+object Communicator {
 
-    override def toString(): String = channel(host, port)
+  def driver: Task[MediaDriver] = ZIO.effect {
+    val context = new MediaDriver.Context().
+      threadingMode(DEDICATED).
+      conductorIdleStrategy(new BusySpinIdleStrategy).
+      receiverIdleStrategy(new BusySpinIdleStrategy).
+      senderIdleStrategy(new BusySpinIdleStrategy)
+    MediaDriver.launch(context)
   }
 
-  final case class Publisher(publication: Publication) extends Logging {
-
-    require(null != publication, "Aeron Publication is not presented!")
-
-    def isConnected (
-      implicit _deadline: Long = (System.nanoTime + 3.seconds.toNanos),
-      countLimit: Int = 3, sleep: () => Unit = { () => Thread.sleep(1) }
-    ): Boolean = {
-      @tailrec
-      def _isConnected(pub: Publication, times: Int = 0): Boolean = {
-        val connected = pub.isConnected
-        if(countLimit > times && (false == connected)) {
-          if(System.nanoTime >= _deadline) false else {
-            sleep() 
-            _isConnected(pub, times + 1)
-          }
-        } else connected
-      }
-      _isConnected(publication)
-    }
-
-    def send (
-      messageBytes: Array[Byte],
-      bufferCapacity: Int = 512, 
-      boundaryAlighment: Int = CACHE_LINE_LENGTH,
-      deadline: Long = (System.nanoTime + 3.seconds.toNanos),
-      sleep: () => Unit = { () => Thread.sleep(1) }
-    ): Either[Throwable, Publisher] = {
-      // TODO: replace with isConnected
-      while(!publication.isConnected()) { 
-        if(System.nanoTime >= deadline) { 
-          return Left(new RuntimeException (
-            s"Can't connect to Publication(${publication.channel}, ${publication.streamId})" 
-          )) 
-        }
-        sleep()
-      }
-      val buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned (
-        bufferCapacity, boundaryAlighment
-      ))
-      buffer.putBytes(0, messageBytes)
-      val result = publication.offer(buffer, 0, messageBytes.length)
-      log.debug(s"Publication.offer() returns result $result") 
-      result match {
-        case res if res < 0L && BACK_PRESSURED == res => Left(BackPressured)
-        case res if res < 0L && NOT_CONNECTED == res => Left(NotConnected)
-        case res if res < 0L && ADMIN_ACTION == res => Left(AdminAction)
-        case res if res < 0L && CLOSED == res => Left(Closed)
-        case res if res < 0L && MAX_POSITION_EXCEEDED == res => Left(MaxPositionExceeded)
-        case res if res < 0L => Left(UnknownReason)
-        case _ => Right(this)
-      }
-    } 
-
-    def run[O](f: Publication => O): O = f(publication)
-
-    def close() = try {} finally { publication.close }
-
+  def aeron: Task[Aeron] = ZIO.effect {
+    val context = new Aeron.Context()    
+    Aeron.connect(context) 
   }
 
-  final case class Subscriber(subscription: Subscription) extends Logging {
-
-    require(null != subscription, "Aeron Subscription is not presented!")
-
-    def receive(f: (DirectBuffer, Int, Int, Header) => Boolean) {
-      var continuous = true 
-      val handler = new FragmentAssembler({ 
-        (buffer: DirectBuffer, offset: Int, length: Int, header: Header) => {
-          val shouldContinuous = f(buffer, offset, length, header)
-          continuous = shouldContinuous
-        }
-      })
-      val idleStrategy = new BusySpinIdleStrategy()
-      while(continuous) {
-        val read = subscription.poll(handler, 10)
-        idleStrategy.idle(read)
-      }
-    }
-
-    def run[O](f: Subscription => O): O = f(subscription)
-
-    def close = try {} finally { subscription.close }
-
+  def subscriber(aeron: Task[Aeron]): Task[Subscription] = aeron.flatMap { a =>
+    Task.effect(a.addSubscription("aeron:udp?endpoint=localhost:12345", 10))
   }
 
-  def create(ctx: Aeron.Context = defaultAeronContext) = 
-    Communicator(aeron = Aeron.connect(ctx))
-
-}
-
-/**
- * Aeron client should only be created one per vm for different channels.
- * Otherwise vm may crash with error
- * {{{
- *   org.agrona.concurrent.status.UnsafeBufferPosition.getVolatile() SIGSEGV 
- * }}}
- */
-final case class Communicator (
-  aeron: Aeron, 
-  publication: Option[Communicator.Publisher] = None, 
-  subscription: Option[Communicator.Subscriber] = None
-) extends Logging {
-
-  import Communicator._
-
-  require(null != aeron, "Aeron instance is not presented!")
-
-  def withPublication (
-    channel: Channel = defaultChannel, streamId: Int = 10
-  ) = this.copy(publication = Option (
-    Publisher(aeron.addPublication(channel.toString, streamId))
-  ))
-
-  def withSubscription (
-    channel: Channel = defaultChannel, streamId: Int = 10
-  ) = this.copy(subscription = Option (
-    Subscriber(aeron.addSubscription(channel.toString, streamId))
-  ))
-
-  def close() = try {} finally {
-    subscription.map(_.close)
-    publication.map(_.close)
-    if(!aeron.isClosed) aeron.close
+  def publisher(aeron: Task[Aeron]): Task[Publication] = aeron.flatMap { a => 
+    Task.effect(a.addPublication("aeron:udp?endpoint=localhost:12344", 10))
   }
+
 }
diff --git a/membership/src/main/scala/org/apache/hama/membership/Driver.scala b/membership/src/main/scala/org/apache/hama/membership/Driver.scala
deleted file mode 100644
index 385065c..0000000
--- a/membership/src/main/scala/org/apache/hama/membership/Driver.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.membership
-
-import io.aeron.driver.MediaDriver
-import io.aeron.driver.ThreadingMode._
-import org.agrona.concurrent.BusySpinIdleStrategy
-import scala.util.Try
-
-object Driver {
-
-  lazy val defaultMediaDriverContext = new MediaDriver.Context().
-    threadingMode(DEDICATED).
-    conductorIdleStrategy(new BusySpinIdleStrategy).
-    receiverIdleStrategy(new BusySpinIdleStrategy).
-    senderIdleStrategy(new BusySpinIdleStrategy)
-
-  def create(ctx: MediaDriver.Context = defaultMediaDriverContext) = 
-    new Driver(ctx)
-
-}
-
-final class Driver(ctx: MediaDriver.Context) {
-
-  def start() = Try(MediaDriver.launch(ctx)).toEither
-
-}
-
diff --git a/membership/src/test/scala/org/apache/hama/membership/CommunicatorSpec.scala b/membership/src/test/scala/org/apache/hama/membership/CommunicatorSpec.scala
deleted file mode 100644
index 5e4ac54..0000000
--- a/membership/src/test/scala/org/apache/hama/membership/CommunicatorSpec.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.membership
-
-import io.aeron.driver.MediaDriver
-import org.agrona.concurrent.ShutdownSignalBarrier  
-import org.apache.hama.logging.Logging
-import org.scalatest._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Either
-import scalaz.zio._
-
-class CommunicatorSpec extends FlatSpec with Matchers with Logging {
-
-  import Communicator._
-
-  "Communicator with zio.Fiber" should "send/ receive message." in {
-
-    val barrier = new ShutdownSignalBarrier 
-    val driverIO = IO.sync{ 
-        val driver = Driver.create().start
-        barrier.await
-        driver
-    }
-
-    lazy val communicator = Communicator.create()
-
-    def subscriptionIO(comm: Communicator) = IO.sync { 
-      comm.withSubscription().subscription.map { sub => sub.receive { 
-        (buffer, offset, length, header) => 
-          val data = new Array[Byte](length)
-          buffer.getBytes(offset, data) 
-          val hello = new String(data)
-          log.info(s"Subscription handler receives message: $hello ")
-          assert("hello".equals(hello))
-          false // stop receive message while loop
-      }}
-      comm
-    }
-
-    def publicationIO(comm: Communicator) = IO.sync { 
-      comm.withPublication().publication.map { pub => pub.send (
-        messageBytes = "hello".getBytes
-      )}
-      comm
-    }
-
-    val handlers = for {
-      dio <- driverIO.fork
-      sio <- subscriptionIO(communicator).fork
-      pio <- publicationIO(communicator).fork
-      cio <- IO.sync { barrier.signal; IO.unit }.fork
-      _ <- cio.join
-      p <- pio.join
-      s <- sio.join
-      d <- dio.join
-    } yield (p, s, d)
-
-    new RTS {}.unsafeRun { 
-      handlers.run 
-    }.map { case (p, s, d) =>
-      p.close
-      s.close
-      d.map(_.close)
-    }
-    log.info("Done!")
-  }
-
-/*
-  "Communicator" should "send/ receive message." in {
-    val barrier = new ShutdownSignalBarrier
-    val f1 = Future {
-       val driver = Driver.create().start
-       barrier.await
-       driver
-    }
-    val communicator = Communicator.create()
-    val f2 = Future { 
-      communicator.withSubscription().subscription.map { sub => sub.receive { 
-        (buffer, offset, length, header) =>
-          val data = new Array[Byte](length)
-          buffer.getBytes(offset, data) 
-          val hello = new String(data)
-          log.info(s"Receive string $hello")
-          assert("hello".equals(hello))
-          false
-      }}
-      communicator
-    }
-    val f3 = Future {
-      communicator.withPublication().publication.map { pub => pub.send (
-        messageBytes = "hello".getBytes
-      )}
-      communicator
-    }
-    Thread.sleep(3 * 1000)  
-    barrier.signal  
-    f2.onComplete {
-      case Success(communicator) => communicator.close
-      case Failure(ex) => throw ex
-    }
-    f3.onComplete {
-      case Success(communicator) => communicator.close
-      case Failure(ex) => throw ex
-    }
-    f1.onComplete {
-      case Success(driver) => driver match {
-        case Left(ex) => throw ex
-        case Right(d) => d.close
-      }
-      case Failure(ex) => throw ex
-    }
-  }
-*/
-
-}
-