You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/05/10 08:25:20 UTC
[incubator-tuweni] branch master updated: Initial commit for hobbits
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new e2ddfeb Initial commit for hobbits
new 38f016c Merge pull request #14 from atoulme/hobbits
e2ddfeb is described below
commit e2ddfeb557e5be42d35e5baf5e0e02767622ef9e
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Wed May 8 22:18:30 2019 -0700
Initial commit for hobbits
---
settings.gradle => hobbits/build.gradle | 49 ++--
.../org/apache/tuweni/hobbits/HobbitsTransport.kt | 264 +++++++++++++++++++++
.../kotlin/org/apache/tuweni/hobbits/Message.kt | 93 ++++++++
.../apache/tuweni/hobbits/HobbitsTransportTest.kt | 81 +++++++
.../org/apache/tuweni/hobbits/MessageTest.kt | 37 +++
settings.gradle | 1 +
6 files changed, 492 insertions(+), 33 deletions(-)
diff --git a/settings.gradle b/hobbits/build.gradle
similarity index 55%
copy from settings.gradle
copy to hobbits/build.gradle
index 4288158..1a8eb2e 100644
--- a/settings.gradle
+++ b/hobbits/build.gradle
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
@@ -10,35 +10,18 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-rootProject.name='tuweni'
-include 'bytes'
-include 'concurrent'
-include 'concurrent-coroutines'
-include 'config'
-include 'crypto'
-include 'devp2p'
-include 'dist'
-include 'dns-discovery'
-include 'eth'
-include 'eth-reference-tests'
-include 'eth-repository'
-include 'gossip'
-include 'io'
-include 'junit'
-include 'kademlia'
-include 'kv'
-include 'les'
-include 'merkle-trie'
-include 'net'
-include 'net-coroutines'
-include 'plumtree'
-include 'progpow'
-include 'rlp'
-include 'rlpx'
-include 'scuttlebutt'
-include 'scuttlebutt-discovery'
-include 'scuttlebutt-handshake'
-include 'scuttlebutt-rpc'
-include 'ssz'
-include 'toml'
-include 'units'
+description = 'Hobbits is a lightweight wire protocol for ETH 2.0 network testing purposes.'
+
+dependencies {
+ compile project(':bytes')
+ compile project(':concurrent-coroutines')
+ compile 'io.vertx:vertx-core'
+ compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core'
+
+ testCompile project(':junit')
+ testCompile 'org.bouncycastle:bcprov-jdk15on'
+ testCompile 'org.junit.jupiter:junit-jupiter-api'
+ testCompile 'org.junit.jupiter:junit-jupiter-params'
+
+ testRuntime 'org.junit.jupiter:junit-jupiter-engine'
+}
diff --git a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt
new file mode 100644
index 0000000..4c1d16c
--- /dev/null
+++ b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.hobbits
+
+import io.vertx.core.Vertx
+import io.vertx.core.buffer.Buffer
+import io.vertx.core.datagram.DatagramSocket
+import io.vertx.core.http.HttpClient
+import io.vertx.core.http.HttpMethod
+import io.vertx.core.http.HttpServer
+import io.vertx.core.net.NetClient
+import io.vertx.core.net.NetServer
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import org.apache.tuweni.concurrent.AsyncCompletion
+import org.apache.tuweni.concurrent.coroutines.await
+import java.lang.IllegalArgumentException
+import java.lang.IllegalStateException
+import java.lang.RuntimeException
+import java.net.URI
+import java.util.concurrent.atomic.AtomicBoolean
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Hobbits is a peer-to-peer transport stack specified at https://www.github.com/deltap2p/hobbits.
+ *
+ * This class works as a transport mechanism that can leverage a variety of network transport
+ * mechanisms, such as TCP, HTTP, UDP and Web sockets.
+ *
+ * It can be used to contact other Hobbits endpoints, or to expose endpoints to the network.
+ *
+ */
+class HobbitsTransport(
+ private val vertx: Vertx,
+ override val coroutineContext: CoroutineContext = Dispatchers.Default
+) : CoroutineScope {
+
+ private val started = AtomicBoolean(false)
+
+ private val httpEndpoints = mutableMapOf<String, Endpoint>()
+ private val tcpEndpoints = mutableMapOf<String, Endpoint>()
+ private val udpEndpoints = mutableMapOf<String, Endpoint>()
+ private val wsEndpoints = mutableMapOf<String, Endpoint>()
+
+ private var httpClient: HttpClient? = null
+ private var tcpClient: NetClient? = null
+ private var udpClient: DatagramSocket? = null
+
+ private var httpServer: HttpServer? = null
+ private var tcpServer: NetServer? = null
+
+ /**
+ * Creates a new endpoint over http.
+ * @param networkInterface the network interface to bind the endpoint to
+ * @param port the port to serve traffic from
+ * @param tls whether the endpoint should be secured using TLS
+ */
+ fun createHTTPEndpoint(
+ id: String = "default",
+ networkInterface: String = "0.0.0.0",
+ port: Int = 9337,
+ tls: Boolean = false
+ ) {
+ checkNotStarted()
+ httpEndpoints[id] = Endpoint(networkInterface, port, tls)
+ }
+
+ /**
+ * Creates a new endpoint over tcp persistent connections.
+ * @param networkInterface the network interface to bind the endpoint to
+ * @param port the port to serve traffic from
+ * @param tls whether the endpoint should be secured using TLS
+ */
+ fun createTCPEndpoint(
+ id: String = "default",
+ networkInterface: String = "0.0.0.0",
+ port: Int = 9237,
+ tls: Boolean = false
+ ) {
+ checkNotStarted()
+ tcpEndpoints[id] = Endpoint(networkInterface, port, tls)
+ }
+
+ /**
+ * Creates a new endpoint over UDP connections.
+ * @param networkInterface the network interface to bind the endpoint to
+ * @param port the port to serve traffic from
+ * @param tls whether the endpoint should be secured using TLS
+ */
+ fun createUDPEndpoint(id: String = "default", networkInterface: String = "0.0.0.0", port: Int = 9137) {
+ checkNotStarted()
+ udpEndpoints[id] = Endpoint(networkInterface, port, false)
+ }
+
+ /**
+ * Creates a new endpoint over websocket connections.
+ * @param networkInterface the network interface to bind the endpoint to
+ * @param port the port to serve traffic from
+ * @param tls whether the endpoint should be secured using TLS
+ */
+ fun createWSEndpoint(
+ id: String = "default",
+ networkInterface: String = "0.0.0.0",
+ port: Int = 9037,
+ tls: Boolean = false
+ ) {
+ checkNotStarted()
+ wsEndpoints[id] = Endpoint(networkInterface, port, tls)
+ }
+
+ /**
+ * Sends a message using the transport specified.
+ *
+ */
+ suspend fun sendMessage(message: Message, transport: Transport, host: String, port: Int) {
+ checkStarted()
+ val completion = AsyncCompletion.incomplete()
+ when (transport) {
+ Transport.HTTP -> {
+ @Suppress("DEPRECATION")
+ val req = httpClient!!.request(HttpMethod.POST, port, host, "/").handler {
+ if (it.statusCode() == 200) {
+ completion.complete()
+ } else {
+ completion.completeExceptionally(RuntimeException())
+ }
+ }
+ req.end(Buffer.buffer(message.toBytes().toArrayUnsafe()))
+ }
+ Transport.TCP -> {
+ tcpClient!!.connect(port, host) { res ->
+ if (res.failed()) {
+ completion.completeExceptionally(res.cause())
+ } else {
+ res.result().end(Buffer.buffer(message.toBytes().toArrayUnsafe()))
+ completion.complete()
+ }
+ }
+ }
+ Transport.UDP -> {
+ TODO()
+ }
+ Transport.WS -> {
+ TODO()
+ }
+ }
+ completion.await()
+ }
+
+ private fun findEndpoint(endpointId: String): Endpoint? {
+ val uri = URI.create(endpointId)
+ if (uri.scheme == null) {
+ if (httpEndpoints.containsKey(endpointId)) {
+ return httpEndpoints[endpointId]
+ }
+ if (tcpEndpoints.containsKey(endpointId)) {
+ return tcpEndpoints[endpointId]
+ }
+ if (udpEndpoints.containsKey(endpointId)) {
+ return udpEndpoints[endpointId]
+ }
+ if (wsEndpoints.containsKey(endpointId)) {
+ return wsEndpoints[endpointId]
+ }
+ return null
+ }
+ when (uri.scheme) {
+ "http" -> {
+ return httpEndpoints[uri.host]
+ }
+ "tcp" -> {
+ return tcpEndpoints[uri.host]
+ }
+ "udp" -> {
+ return udpEndpoints[uri.host]
+ }
+ "ws" -> {
+ return wsEndpoints[uri.host]
+ }
+ else -> {
+ throw IllegalArgumentException("Unsupported endpoint $endpointId")
+ }
+ }
+ }
+
+ suspend fun start() {
+ if (started.compareAndSet(false, true)) {
+ httpClient = vertx.createHttpClient()
+ tcpClient = vertx.createNetClient()
+ udpClient = vertx.createDatagramSocket()
+
+ httpServer = vertx.createHttpServer()
+ tcpServer = vertx.createNetServer()
+
+ val completions = mutableListOf<AsyncCompletion>()
+ for (endpoint in httpEndpoints.values) {
+ val completion = AsyncCompletion.incomplete()
+ httpServer!!.listen(endpoint.port, endpoint.networkInterface) {
+ if (it.failed()) {
+ completion.completeExceptionally(it.cause())
+ } else {
+ completion.complete()
+ }
+ }
+ completions.add(completion)
+ }
+ for (endpoint in tcpEndpoints.values) {
+ val completion = AsyncCompletion.incomplete()
+ tcpServer!!.listen(endpoint.port, endpoint.networkInterface) {
+ if (it.failed()) {
+ completion.completeExceptionally(it.cause())
+ } else {
+ completion.complete()
+ }
+ }
+ completions.add(completion)
+ }
+ AsyncCompletion.allOf(completions).await()
+ }
+ }
+
+ fun stop() {
+ if (started.compareAndSet(true, false)) {
+ httpClient!!.close()
+ tcpClient!!.close()
+ udpClient!!.close()
+ }
+ }
+
+ private fun checkNotStarted() {
+ if (started.get()) {
+ throw IllegalStateException("Server already started")
+ }
+ }
+
+ private fun checkStarted() {
+ if (!started.get()) {
+ throw IllegalStateException("Server not started")
+ }
+ }
+}
+
+internal data class Endpoint(val networkInterface: String, val port: Int, val tls: Boolean)
+
+enum class Transport() {
+ HTTP,
+ TCP,
+ UDP,
+ WS
+}
diff --git a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt
new file mode 100644
index 0000000..14c2cc3
--- /dev/null
+++ b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.hobbits
+
+import com.google.common.base.Splitter
+import org.apache.tuweni.bytes.Bytes
+import java.nio.charset.StandardCharsets
+
+/**
+ * Hobbits message.
+ *
+ */
+class Message(
+ val protocol: String = "EWP",
+ val version: String = "0.2",
+ val command: String,
+ val headers: Bytes,
+ val body: Bytes
+) {
+
+ companion object {
+
+ /**
+ * Reads a message from a byte buffer.
+ * @param message the message bytes
+ * @return the message interpreted by the codec, or null if the message is too short.
+ */
+ @JvmStatic
+ fun readMessage(message: Bytes): Message? {
+ var requestLineBytes: Bytes? = null
+ for (i in 0 until message.size()) {
+ if (message.get(i) == '\n'.toByte()) {
+ requestLineBytes = message.slice(0, i)
+ break
+ }
+ }
+ if (requestLineBytes == null) {
+ return null
+ }
+ val requestLine = String(requestLineBytes.toArrayUnsafe(), StandardCharsets.UTF_8)
+ val segments = Splitter.on(" ").split(requestLine).iterator()
+
+ val protocol = segments.next()
+ if (!segments.hasNext()) {
+ return null
+ }
+ val version = segments.next()
+ if (!segments.hasNext()) {
+ return null
+ }
+ val command = segments.next()
+ if (!segments.hasNext()) {
+ return null
+ }
+ val headersLength = segments.next().toInt()
+ if (!segments.hasNext()) {
+ return null
+ }
+ val bodyLength = segments.next().toInt()
+
+ if (message.size() < requestLineBytes.size() + 1 + headersLength + bodyLength) {
+ return null
+ }
+
+ val headers = message.slice(requestLineBytes.size() + 1, headersLength)
+ val body = message.slice(requestLineBytes.size() + headersLength + 1, bodyLength)
+
+ return Message(protocol, version, command, headers, body)
+ }
+ }
+
+ /**
+ * Writes a message into bytes.
+ */
+ fun toBytes(): Bytes {
+ val requestLine = "$protocol $version $command ${headers.size()} ${body.size()}\n"
+ return Bytes.concatenate(Bytes.wrap(requestLine.toByteArray()), headers, body)
+ }
+}
diff --git a/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt
new file mode 100644
index 0000000..8ddac68
--- /dev/null
+++ b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.hobbits
+
+import io.vertx.core.Vertx
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.concurrent.AsyncResult
+import org.apache.tuweni.concurrent.coroutines.await
+import org.apache.tuweni.junit.VertxExtension
+import org.apache.tuweni.junit.VertxInstance
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.junit.jupiter.api.extension.ExtendWith
+import java.lang.IllegalStateException
+
+@ExtendWith(VertxExtension::class)
+class HobbitsTransportTest {
+
+ @Test
+ fun testLifecycle(@VertxInstance vertx: Vertx) = runBlocking {
+ val server = HobbitsTransport(vertx)
+ server.start()
+ server.start()
+ server.stop()
+ }
+
+ @Test
+ fun sendMessageBeforeStart(@VertxInstance vertx: Vertx) = runBlocking {
+ val server = HobbitsTransport(vertx)
+ val exception: IllegalStateException = assertThrows {
+ runBlocking {
+ server.sendMessage(Message(command = "RCP", headers = Bytes.EMPTY, body = Bytes.EMPTY),
+ Transport.TCP, "localhost", 9000)
+ }
+ }
+ assertEquals("Server not started", exception.message)
+ }
+
+ @Test
+ fun registerEndpointAfterStart(@VertxInstance vertx: Vertx) = runBlocking {
+ val server = HobbitsTransport(vertx)
+ server.start()
+ val exception: IllegalStateException = assertThrows {
+ server.createHTTPEndpoint()
+ }
+ assertEquals("Server already started", exception.message)
+ }
+
+ @Test
+ fun sendMessage(@VertxInstance vertx: Vertx) = runBlocking {
+ val completion = AsyncResult.incomplete<Bytes>()
+ val listening = vertx.createNetServer()
+ listening.connectHandler {
+ it.handler {
+ completion.complete(Bytes.wrapBuffer(it))
+ }
+ }.listen(10000, "localhost")
+ val server = HobbitsTransport(vertx)
+ server.start()
+ val msg = Message(command = "RCP", headers = Bytes.EMPTY, body = Bytes.EMPTY)
+ server.sendMessage(msg, Transport.TCP, "localhost", 10000)
+ val result = completion.await()
+ assertEquals(msg.toBytes(), result)
+ }
+}
diff --git a/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/MessageTest.kt b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/MessageTest.kt
new file mode 100644
index 0000000..2006827
--- /dev/null
+++ b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/MessageTest.kt
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.hobbits
+
+import org.apache.tuweni.bytes.Bytes
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+class MessageTest {
+
+ @Test
+ fun parseMessageRoundtrip() {
+ val msg = Message(command = "RPC", headers = Bytes.fromHexString("deadbeef"),
+ body = Bytes.fromHexString("deadbeef"))
+ val serialized = msg.toBytes()
+ val read = Message.readMessage(serialized)!!
+ assertEquals("EWP", read.protocol)
+ assertEquals("0.2", read.version)
+ assertEquals("RPC", read.command)
+ assertEquals(Bytes.fromHexString("deadbeef"), read.headers)
+ assertEquals(Bytes.fromHexString("deadbeef"), read.body)
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index 4288158..b198613 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -23,6 +23,7 @@ include 'eth'
include 'eth-reference-tests'
include 'eth-repository'
include 'gossip'
+include 'hobbits'
include 'io'
include 'junit'
include 'kademlia'
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org