You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/05/10 08:25:20 UTC

[incubator-tuweni] branch master updated: Initial commit for hobbits

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new e2ddfeb  Initial commit for hobbits
     new 38f016c  Merge pull request #14 from atoulme/hobbits
e2ddfeb is described below

commit e2ddfeb557e5be42d35e5baf5e0e02767622ef9e
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Wed May 8 22:18:30 2019 -0700

    Initial commit for hobbits
---
 settings.gradle => hobbits/build.gradle            |  49 ++--
 .../org/apache/tuweni/hobbits/HobbitsTransport.kt  | 264 +++++++++++++++++++++
 .../kotlin/org/apache/tuweni/hobbits/Message.kt    |  93 ++++++++
 .../apache/tuweni/hobbits/HobbitsTransportTest.kt  |  81 +++++++
 .../org/apache/tuweni/hobbits/MessageTest.kt       |  37 +++
 settings.gradle                                    |   1 +
 6 files changed, 492 insertions(+), 33 deletions(-)

diff --git a/settings.gradle b/hobbits/build.gradle
similarity index 55%
copy from settings.gradle
copy to hobbits/build.gradle
index 4288158..1a8eb2e 100644
--- a/settings.gradle
+++ b/hobbits/build.gradle
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
@@ -10,35 +10,18 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-rootProject.name='tuweni'
-include 'bytes'
-include 'concurrent'
-include 'concurrent-coroutines'
-include 'config'
-include 'crypto'
-include 'devp2p'
-include 'dist'
-include 'dns-discovery'
-include 'eth'
-include 'eth-reference-tests'
-include 'eth-repository'
-include 'gossip'
-include 'io'
-include 'junit'
-include 'kademlia'
-include 'kv'
-include 'les'
-include 'merkle-trie'
-include 'net'
-include 'net-coroutines'
-include 'plumtree'
-include 'progpow'
-include 'rlp'
-include 'rlpx'
-include 'scuttlebutt'
-include 'scuttlebutt-discovery'
-include 'scuttlebutt-handshake'
-include 'scuttlebutt-rpc'
-include 'ssz'
-include 'toml'
-include 'units'
+description = 'Hobbits is a lightweight wire protocol for ETH 2.0 network testing purposes.'
+
+dependencies {
+  compile project(':bytes')
+  compile project(':concurrent-coroutines')
+  compile 'io.vertx:vertx-core'
+  compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core'
+
+  testCompile project(':junit')
+  testCompile 'org.bouncycastle:bcprov-jdk15on'
+  testCompile 'org.junit.jupiter:junit-jupiter-api'
+  testCompile 'org.junit.jupiter:junit-jupiter-params'
+
+  testRuntime 'org.junit.jupiter:junit-jupiter-engine'
+}
diff --git a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt
new file mode 100644
index 0000000..4c1d16c
--- /dev/null
+++ b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.hobbits
+
+import io.vertx.core.Vertx
+import io.vertx.core.buffer.Buffer
+import io.vertx.core.datagram.DatagramSocket
+import io.vertx.core.http.HttpClient
+import io.vertx.core.http.HttpMethod
+import io.vertx.core.http.HttpServer
+import io.vertx.core.net.NetClient
+import io.vertx.core.net.NetServer
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import org.apache.tuweni.concurrent.AsyncCompletion
+import org.apache.tuweni.concurrent.coroutines.await
+import java.lang.IllegalArgumentException
+import java.lang.IllegalStateException
+import java.lang.RuntimeException
+import java.net.URI
+import java.util.concurrent.atomic.AtomicBoolean
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Hobbits is a peer-to-peer transport stack specified at https://www.github.com/deltap2p/hobbits.
+ *
+ * This class works as a transport mechanism that can leverage a variety of network transport
+ * mechanisms, such as TCP, HTTP, UDP and Web sockets.
+ *
+ * It can be used to contact other Hobbits endpoints, or to expose endpoints to the network.
+ *
+ */
+class HobbitsTransport(
+  private val vertx: Vertx,
+  override val coroutineContext: CoroutineContext = Dispatchers.Default
+) : CoroutineScope {
+
+  private val started = AtomicBoolean(false)
+
+  private val httpEndpoints = mutableMapOf<String, Endpoint>()
+  private val tcpEndpoints = mutableMapOf<String, Endpoint>()
+  private val udpEndpoints = mutableMapOf<String, Endpoint>()
+  private val wsEndpoints = mutableMapOf<String, Endpoint>()
+
+  private var httpClient: HttpClient? = null
+  private var tcpClient: NetClient? = null
+  private var udpClient: DatagramSocket? = null
+
+  private var httpServer: HttpServer? = null
+  private var tcpServer: NetServer? = null
+
+  /**
+   * Creates a new endpoint over http.
+   * @param networkInterface the network interface to bind the endpoint to
+   * @param port the port to serve traffic from
+   * @param tls whether the endpoint should be secured using TLS
+   */
+  fun createHTTPEndpoint(
+    id: String = "default",
+    networkInterface: String = "0.0.0.0",
+    port: Int = 9337,
+    tls: Boolean = false
+  ) {
+    checkNotStarted()
+    httpEndpoints[id] = Endpoint(networkInterface, port, tls)
+  }
+
+  /**
+   * Creates a new endpoint over tcp persistent connections.
+   * @param networkInterface the network interface to bind the endpoint to
+   * @param port the port to serve traffic from
+   * @param tls whether the endpoint should be secured using TLS
+   */
+  fun createTCPEndpoint(
+    id: String = "default",
+    networkInterface: String = "0.0.0.0",
+    port: Int = 9237,
+    tls: Boolean = false
+  ) {
+    checkNotStarted()
+    tcpEndpoints[id] = Endpoint(networkInterface, port, tls)
+  }
+
+  /**
+   * Creates a new endpoint over UDP connections.
+   * @param networkInterface the network interface to bind the endpoint to
+   * @param port the port to serve traffic from
+   * @param tls whether the endpoint should be secured using TLS
+   */
+  fun createUDPEndpoint(id: String = "default", networkInterface: String = "0.0.0.0", port: Int = 9137) {
+    checkNotStarted()
+    udpEndpoints[id] = Endpoint(networkInterface, port, false)
+  }
+
+  /**
+   * Creates a new endpoint over websocket connections.
+   * @param networkInterface the network interface to bind the endpoint to
+   * @param port the port to serve traffic from
+   * @param tls whether the endpoint should be secured using TLS
+   */
+  fun createWSEndpoint(
+    id: String = "default",
+    networkInterface: String = "0.0.0.0",
+    port: Int = 9037,
+    tls: Boolean = false
+  ) {
+    checkNotStarted()
+    wsEndpoints[id] = Endpoint(networkInterface, port, tls)
+  }
+
+  /**
+   * Sends a message using the transport specified.
+   *
+   */
+  suspend fun sendMessage(message: Message, transport: Transport, host: String, port: Int) {
+    checkStarted()
+    val completion = AsyncCompletion.incomplete()
+    when (transport) {
+      Transport.HTTP -> {
+        @Suppress("DEPRECATION")
+        val req = httpClient!!.request(HttpMethod.POST, port, host, "/").handler {
+          if (it.statusCode() == 200) {
+            completion.complete()
+          } else {
+            completion.completeExceptionally(RuntimeException())
+          }
+        }
+        req.end(Buffer.buffer(message.toBytes().toArrayUnsafe()))
+      }
+      Transport.TCP -> {
+        tcpClient!!.connect(port, host) { res ->
+          if (res.failed()) {
+            completion.completeExceptionally(res.cause())
+          } else {
+            res.result().end(Buffer.buffer(message.toBytes().toArrayUnsafe()))
+            completion.complete()
+          }
+        }
+      }
+      Transport.UDP -> {
+        TODO()
+      }
+      Transport.WS -> {
+        TODO()
+      }
+    }
+    completion.await()
+  }
+
+  private fun findEndpoint(endpointId: String): Endpoint? {
+    val uri = URI.create(endpointId)
+    if (uri.scheme == null) {
+      if (httpEndpoints.containsKey(endpointId)) {
+        return httpEndpoints[endpointId]
+      }
+      if (tcpEndpoints.containsKey(endpointId)) {
+        return tcpEndpoints[endpointId]
+      }
+      if (udpEndpoints.containsKey(endpointId)) {
+        return udpEndpoints[endpointId]
+      }
+      if (wsEndpoints.containsKey(endpointId)) {
+        return wsEndpoints[endpointId]
+      }
+      return null
+    }
+    when (uri.scheme) {
+      "http" -> {
+        return httpEndpoints[uri.host]
+      }
+      "tcp" -> {
+        return tcpEndpoints[uri.host]
+      }
+      "udp" -> {
+        return udpEndpoints[uri.host]
+      }
+      "ws" -> {
+        return wsEndpoints[uri.host]
+      }
+      else -> {
+        throw IllegalArgumentException("Unsupported endpoint $endpointId")
+      }
+    }
+  }
+
+  suspend fun start() {
+    if (started.compareAndSet(false, true)) {
+      httpClient = vertx.createHttpClient()
+      tcpClient = vertx.createNetClient()
+      udpClient = vertx.createDatagramSocket()
+
+      httpServer = vertx.createHttpServer()
+      tcpServer = vertx.createNetServer()
+
+      val completions = mutableListOf<AsyncCompletion>()
+      for (endpoint in httpEndpoints.values) {
+        val completion = AsyncCompletion.incomplete()
+        httpServer!!.listen(endpoint.port, endpoint.networkInterface) {
+          if (it.failed()) {
+            completion.completeExceptionally(it.cause())
+          } else {
+            completion.complete()
+          }
+        }
+        completions.add(completion)
+      }
+      for (endpoint in tcpEndpoints.values) {
+        val completion = AsyncCompletion.incomplete()
+        tcpServer!!.listen(endpoint.port, endpoint.networkInterface) {
+          if (it.failed()) {
+            completion.completeExceptionally(it.cause())
+          } else {
+            completion.complete()
+          }
+        }
+        completions.add(completion)
+      }
+      AsyncCompletion.allOf(completions).await()
+    }
+  }
+
+  fun stop() {
+    if (started.compareAndSet(true, false)) {
+      httpClient!!.close()
+      tcpClient!!.close()
+      udpClient!!.close()
+    }
+  }
+
+  private fun checkNotStarted() {
+    if (started.get()) {
+      throw IllegalStateException("Server already started")
+    }
+  }
+
+  private fun checkStarted() {
+    if (!started.get()) {
+      throw IllegalStateException("Server not started")
+    }
+  }
+}
+
+internal data class Endpoint(val networkInterface: String, val port: Int, val tls: Boolean)
+
+enum class Transport() {
+  HTTP,
+  TCP,
+  UDP,
+  WS
+}
diff --git a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt
new file mode 100644
index 0000000..14c2cc3
--- /dev/null
+++ b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.hobbits
+
+import com.google.common.base.Splitter
+import org.apache.tuweni.bytes.Bytes
+import java.nio.charset.StandardCharsets
+
+/**
+ * Hobbits message.
+ *
+ */
+class Message(
+  val protocol: String = "EWP",
+  val version: String = "0.2",
+  val command: String,
+  val headers: Bytes,
+  val body: Bytes
+) {
+
+  companion object {
+
+    /**
+     * Reads a message from a byte buffer.
+     * @param message the message bytes
+     * @return the message interpreted by the codec, or null if the message is too short.
+     */
+    @JvmStatic
+    fun readMessage(message: Bytes): Message? {
+      var requestLineBytes: Bytes? = null
+      for (i in 0 until message.size()) {
+        if (message.get(i) == '\n'.toByte()) {
+          requestLineBytes = message.slice(0, i)
+          break
+        }
+      }
+      if (requestLineBytes == null) {
+        return null
+      }
+      val requestLine = String(requestLineBytes.toArrayUnsafe(), StandardCharsets.UTF_8)
+      val segments = Splitter.on(" ").split(requestLine).iterator()
+
+      val protocol = segments.next()
+      if (!segments.hasNext()) {
+        return null
+      }
+      val version = segments.next()
+      if (!segments.hasNext()) {
+        return null
+      }
+      val command = segments.next()
+      if (!segments.hasNext()) {
+        return null
+      }
+      val headersLength = segments.next().toInt()
+      if (!segments.hasNext()) {
+        return null
+      }
+      val bodyLength = segments.next().toInt()
+
+      if (message.size() < requestLineBytes.size() + 1 + headersLength + bodyLength) {
+        return null
+      }
+
+      val headers = message.slice(requestLineBytes.size() + 1, headersLength)
+      val body = message.slice(requestLineBytes.size() + headersLength + 1, bodyLength)
+
+      return Message(protocol, version, command, headers, body)
+    }
+  }
+
+  /**
+   * Writes a message into bytes.
+   */
+  fun toBytes(): Bytes {
+    val requestLine = "$protocol $version $command ${headers.size()} ${body.size()}\n"
+    return Bytes.concatenate(Bytes.wrap(requestLine.toByteArray()), headers, body)
+  }
+}
diff --git a/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt
new file mode 100644
index 0000000..8ddac68
--- /dev/null
+++ b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.hobbits
+
+import io.vertx.core.Vertx
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.concurrent.AsyncResult
+import org.apache.tuweni.concurrent.coroutines.await
+import org.apache.tuweni.junit.VertxExtension
+import org.apache.tuweni.junit.VertxInstance
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.junit.jupiter.api.extension.ExtendWith
+import java.lang.IllegalStateException
+
+@ExtendWith(VertxExtension::class)
+class HobbitsTransportTest {
+
+  @Test
+  fun testLifecycle(@VertxInstance vertx: Vertx) = runBlocking {
+    val server = HobbitsTransport(vertx)
+    server.start()
+    server.start()
+    server.stop()
+  }
+
+  @Test
+  fun sendMessageBeforeStart(@VertxInstance vertx: Vertx) = runBlocking {
+    val server = HobbitsTransport(vertx)
+    val exception: IllegalStateException = assertThrows {
+      runBlocking {
+        server.sendMessage(Message(command = "RCP", headers = Bytes.EMPTY, body = Bytes.EMPTY),
+          Transport.TCP, "localhost", 9000)
+      }
+    }
+    assertEquals("Server not started", exception.message)
+  }
+
+  @Test
+  fun registerEndpointAfterStart(@VertxInstance vertx: Vertx) = runBlocking {
+    val server = HobbitsTransport(vertx)
+    server.start()
+    val exception: IllegalStateException = assertThrows {
+      server.createHTTPEndpoint()
+    }
+    assertEquals("Server already started", exception.message)
+  }
+
+  @Test
+  fun sendMessage(@VertxInstance vertx: Vertx) = runBlocking {
+    val completion = AsyncResult.incomplete<Bytes>()
+    val listening = vertx.createNetServer()
+    listening.connectHandler {
+      it.handler {
+          completion.complete(Bytes.wrapBuffer(it))
+      }
+    }.listen(10000, "localhost")
+    val server = HobbitsTransport(vertx)
+    server.start()
+    val msg = Message(command = "RCP", headers = Bytes.EMPTY, body = Bytes.EMPTY)
+    server.sendMessage(msg, Transport.TCP, "localhost", 10000)
+    val result = completion.await()
+    assertEquals(msg.toBytes(), result)
+  }
+}
diff --git a/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/MessageTest.kt b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/MessageTest.kt
new file mode 100644
index 0000000..2006827
--- /dev/null
+++ b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/MessageTest.kt
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.hobbits
+
+import org.apache.tuweni.bytes.Bytes
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+class MessageTest {
+
+  @Test
+  fun parseMessageRoundtrip() {
+    val msg = Message(command = "RPC", headers = Bytes.fromHexString("deadbeef"),
+      body = Bytes.fromHexString("deadbeef"))
+    val serialized = msg.toBytes()
+    val read = Message.readMessage(serialized)!!
+    assertEquals("EWP", read.protocol)
+    assertEquals("0.2", read.version)
+    assertEquals("RPC", read.command)
+    assertEquals(Bytes.fromHexString("deadbeef"), read.headers)
+    assertEquals(Bytes.fromHexString("deadbeef"), read.body)
+  }
+}
diff --git a/settings.gradle b/settings.gradle
index 4288158..b198613 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -23,6 +23,7 @@ include 'eth'
 include 'eth-reference-tests'
 include 'eth-repository'
 include 'gossip'
+include 'hobbits'
 include 'io'
 include 'junit'
 include 'kademlia'


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org