You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by gm...@apache.org on 2019/09/23 15:37:35 UTC

[incubator-tuweni] branch master updated: Scuttlebutt: add functionality for querying peers and the social graph

This is an automated email from the ASF dual-hosted git repository.

gmartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 33ffaa7  Scuttlebutt: add functionality for querying peers and the social graph
     new 980a7ac  Merge pull request #32 from Happy0/peers
33ffaa7 is described below

commit 33ffaa79b392971487a0cae2db454d09084492ec
Author: Gordon Martin <go...@gmail.com>
AuthorDate: Thu Sep 19 16:13:20 2019 +0100

    Scuttlebutt: add functionality for querying peers and the social graph
---
 .../apache/tuweni/scuttlebutt/lib/FeedService.java |  10 +-
 .../tuweni/scuttlebutt/lib/NetworkService.java     | 102 +++++++-
 .../tuweni/scuttlebutt/lib/ScuttlebuttClient.java  |  17 +-
 .../tuweni/scuttlebutt/lib/SocialService.java      | 269 +++++++++++++++++++++
 .../lib/model/{StreamHandler.java => Peer.java}    |  54 +++--
 .../{StreamHandler.java => PeerStateChange.java}   |  46 ++--
 .../lib/model/{StreamHandler.java => Profile.java} |  39 +--
 .../scuttlebutt/lib/model/StreamHandler.java       |   3 -
 .../{StreamHandler.java => UpdateNameMessage.java} |  45 ++--
 .../{StreamHandler.java => query/AboutQuery.java}  |  39 +--
 .../AboutQueryResponse.java}                       |  32 ++-
 .../IsFollowingQuery.java}                         |  39 +--
 .../IsFollowingResponse.java}                      |  46 ++--
 .../WhoAmIResponse.java}                           |  29 +--
 .../tuweni/scuttlebutt/lib/FeedStreamTest.java     |   2 +-
 .../tuweni/scuttlebutt/lib/PeerInfoTest.java       | 131 ++++++++++
 .../tuweni/scuttlebutt/lib/SocialServiceTest.java  | 120 +++++++++
 .../apache/tuweni/scuttlebutt/rpc/RPCResponse.java |   6 +
 18 files changed, 856 insertions(+), 173 deletions(-)

diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/FeedService.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/FeedService.java
index b49e3c3..1e93420 100644
--- a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/FeedService.java
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/FeedService.java
@@ -27,6 +27,7 @@ import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.ConnectionClosedExceptio
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.function.Function;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -83,18 +84,23 @@ public final class FeedService {
   /**
    * Streams every message in the instance's database.
    *
-   * @param handler the handler for processing the streamed messages
+   * @param streamHandler a function that can be used to construct the handler for processing the streamed messages,
+   *        using a runnable which can be ran to close the stream early.
    * @throws JsonProcessingException if the request to open the stream could not be made due to a JSON marshalling
    *         error.
    *
    * @throws ConnectionClosedException if the stream could not be started because the connection is no longer open.
    */
-  public void createFeedStream(StreamHandler<FeedMessage> handler) throws JsonProcessingException,
+  public void createFeedStream(Function<Runnable, StreamHandler<FeedMessage>> streamHandler)
+      throws JsonProcessingException,
       ConnectionClosedException {
 
     RPCStreamRequest streamRequest = new RPCStreamRequest(new RPCFunction("createFeedStream"), Arrays.asList());
 
     multiplexer.openStream(streamRequest, (closer) -> new ScuttlebuttStreamHandler() {
+
+      StreamHandler<FeedMessage> handler = streamHandler.apply(closer);
+
       @Override
       public void onMessage(RPCResponse message) {
         try {
diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/NetworkService.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/NetworkService.java
index 9a9f461..007ba7d 100644
--- a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/NetworkService.java
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/NetworkService.java
@@ -15,24 +15,45 @@ package org.apache.tuweni.scuttlebutt.lib;
 import org.apache.tuweni.concurrent.AsyncResult;
 import org.apache.tuweni.scuttlebutt.Invite;
 import org.apache.tuweni.scuttlebutt.MalformedInviteCodeException;
+import org.apache.tuweni.scuttlebutt.lib.model.Peer;
+import org.apache.tuweni.scuttlebutt.lib.model.PeerStateChange;
+import org.apache.tuweni.scuttlebutt.lib.model.StreamHandler;
 import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest;
 import org.apache.tuweni.scuttlebutt.rpc.RPCFunction;
 import org.apache.tuweni.scuttlebutt.rpc.RPCResponse;
+import org.apache.tuweni.scuttlebutt.rpc.RPCStreamRequest;
 import org.apache.tuweni.scuttlebutt.rpc.mux.Multiplexer;
+import org.apache.tuweni.scuttlebutt.rpc.mux.ScuttlebuttStreamHandler;
+import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
 
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 /**
  * A service for operations that connect nodes together and other network related operations
- *
+ * <p>
+ * Assumes the standard 'ssb-gossip' plugin is installed and enabled on the node that we're connected to (or that RPC
+ * functions meeting its manifest's contract are available.).
+ * <p>
  * Should not be constructed directly, should be used via an ScuttlebuttClient instance.
  */
 public class NetworkService {
 
   private final Multiplexer multiplexer;
 
+  // We don't represent all the fields returned over RPC in our java classes, so we configure the mapper
+  // to ignore JSON fields without a corresponding Java field
+  private final ObjectMapper mapper =
+      new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
   protected NetworkService(Multiplexer multiplexer) {
     this.multiplexer = multiplexer;
   }
@@ -66,7 +87,7 @@ public class NetworkService {
   /**
    * Redeems an invite issued by another node. If successful, the node will connect to the other node and each node will
    * befriend each other.
-   * 
+   *
    * @param invite
    * @return
    */
@@ -82,6 +103,83 @@ public class NetworkService {
     }
   }
 
+  /**
+   * Queries for the list of peers the instance is connected to.
+   *
+   * @return
+   */
+  public AsyncResult<List<Peer>> getConnectedPeers() {
+    return getAllKnownPeers().thenApply(
+        peers -> peers.stream().filter(peer -> peer.getState().equals("connected")).collect(Collectors.toList()));
+  }
+
+  /**
+   * Queries for all the peers the instance is aware of in its gossip table.
+   *
+   * @return
+   */
+  public AsyncResult<List<Peer>> getAllKnownPeers() {
+    RPCFunction function = new RPCFunction(Arrays.asList("gossip"), "peers");
+    RPCAsyncRequest request = new RPCAsyncRequest(function, Arrays.asList());
+
+    try {
+      return multiplexer.makeAsyncRequest(request).then(rpcResponse -> {
+        try {
+          List<Peer> peers = rpcResponse.asJSON(mapper, new TypeReference<List<Peer>>() {});
+          return AsyncResult.completed(peers);
+        } catch (IOException e) {
+          return AsyncResult.exceptional(e);
+        }
+      });
+    } catch (JsonProcessingException e) {
+      return AsyncResult.exceptional(e);
+    }
+  }
+
+  /**
+   * Opens a stream of peer connection state changes.
+   *
+   * @param streamHandler A function that can be invoked to instantiate a stream handler to pass events to, with a given
+   *        runnable to close the stream early
+   * @throws JsonProcessingException If the stream could be opened due to an error serializing the request
+   * @throws ConnectionClosedException if the stream could not be opened due to the connection being closed
+   */
+  public void createChangesStream(Function<Runnable, StreamHandler<PeerStateChange>> streamHandler)
+      throws JsonProcessingException,
+      ConnectionClosedException {
+    RPCFunction function = new RPCFunction(Arrays.asList("gossip"), "changes");
+
+    RPCStreamRequest request = new RPCStreamRequest(function, Arrays.asList());
+
+    multiplexer.openStream(request, (closer) -> new ScuttlebuttStreamHandler() {
+
+      StreamHandler<PeerStateChange> changeStream = streamHandler.apply(closer);
+
+      @Override
+      public void onMessage(RPCResponse message) {
+        try {
+          PeerStateChange peerStateChange = message.asJSON(mapper, PeerStateChange.class);
+          changeStream.onMessage(peerStateChange);
+        } catch (IOException e) {
+          changeStream.onStreamError(e);
+          closer.run();
+        }
+      }
+
+      @Override
+      public void onStreamEnd() {
+        changeStream.onStreamEnd();
+      }
+
+      @Override
+      public void onStreamError(Exception ex) {
+        changeStream.onStreamError(ex);
+      }
+    });
+
+  }
+
+
   private Invite inviteFromRPCResponse(RPCResponse response) throws MalformedInviteCodeException {
     String rawInviteCode = response.asString();
     return Invite.fromCanonicalForm(rawInviteCode);
diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/ScuttlebuttClient.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/ScuttlebuttClient.java
index 12d30d5..9c694aa 100644
--- a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/ScuttlebuttClient.java
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/ScuttlebuttClient.java
@@ -26,7 +26,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 public class ScuttlebuttClient {
 
   private final Multiplexer multiplexer;
-  private final ObjectMapper mapper;
+
+  private final FeedService feedService;
 
   /**
    *
@@ -35,7 +36,7 @@ public class ScuttlebuttClient {
    */
   protected ScuttlebuttClient(Multiplexer multiplexer, ObjectMapper mapper) {
     this.multiplexer = multiplexer;
-    this.mapper = mapper;
+    this.feedService = new FeedService(multiplexer, mapper);
   }
 
   /**
@@ -53,9 +54,19 @@ public class ScuttlebuttClient {
    * @return
    */
   public FeedService getFeedService() {
-    return new FeedService(multiplexer, mapper);
+    return feedService;
+  }
+
+  /**
+   * A service for operations concerning social connections and updating the instance's profile
+   *
+   * @return
+   */
+  public SocialService getSocialService() {
+    return new SocialService(multiplexer, feedService);
   }
 
+
   /**
    * A service for making lower level requests that are not supported by higher level services.
    *
diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/SocialService.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/SocialService.java
new file mode 100644
index 0000000..b996f68
--- /dev/null
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/SocialService.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.scuttlebutt.lib;
+
+import org.apache.tuweni.concurrent.AsyncResult;
+import org.apache.tuweni.scuttlebutt.lib.model.Profile;
+import org.apache.tuweni.scuttlebutt.lib.model.UpdateNameMessage;
+import org.apache.tuweni.scuttlebutt.lib.model.query.AboutQuery;
+import org.apache.tuweni.scuttlebutt.lib.model.query.AboutQueryResponse;
+import org.apache.tuweni.scuttlebutt.lib.model.query.IsFollowingQuery;
+import org.apache.tuweni.scuttlebutt.lib.model.query.IsFollowingResponse;
+import org.apache.tuweni.scuttlebutt.lib.model.query.WhoAmIResponse;
+import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest;
+import org.apache.tuweni.scuttlebutt.rpc.RPCFunction;
+import org.apache.tuweni.scuttlebutt.rpc.RPCResponse;
+import org.apache.tuweni.scuttlebutt.rpc.mux.Multiplexer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Operations for querying the follow graph, and fetching the profiles of users.
+ *
+ * Assumes that the standard 'ssb-about' and 'ssb-friends' plugins are installed on the target instance (or that RPC
+ * functions meeting their manifests' contracts are available.)
+ *
+ * Should not be instantiated directly - an instance should be acquired via the ScuttlebuttClient instance
+ *
+ */
+public class SocialService {
+
+  private final Multiplexer multiplexer;
+  private final FeedService feedService;
+
+  private ObjectMapper mapper = new ObjectMapper();
+
+  protected SocialService(Multiplexer multiplexer, FeedService feedService) {
+    this.multiplexer = multiplexer;
+    this.feedService = feedService;
+  }
+
+  /**
+   * Get the instance's public key (the key used for its identity.)
+   *
+   * @return
+   */
+  public AsyncResult<String> getOwnIdentity() {
+    RPCFunction function = new RPCFunction("whoami");
+
+    RPCAsyncRequest request = new RPCAsyncRequest(function, Arrays.asList());
+
+    try {
+      return multiplexer.makeAsyncRequest(request).then(response -> {
+        try {
+          return AsyncResult.completed(response.asJSON(mapper, WhoAmIResponse.class).getId());
+        } catch (IOException e) {
+          return AsyncResult.exceptional(e);
+        }
+      });
+    } catch (JsonProcessingException e) {
+      return AsyncResult.exceptional(e);
+    }
+  }
+
+  /**
+   * Get the instance's current profile
+   *
+   * @return
+   */
+  public AsyncResult<Profile> getOwnProfile() {
+    return getOwnIdentity().then(identity -> getProfile(identity));
+  }
+
+  /**
+   * Get the profiles of all the users that the instance is following.
+   *
+   * @return
+   */
+  public AsyncResult<List<Profile>> getFollowing() {
+    return getHops().then(followHops -> {
+      List<String> following =
+          followHops.keySet().stream().filter(key -> followHops.get(key) == 1).collect(Collectors.toList());
+
+      return getProfiles(following);
+    });
+  }
+
+  /**
+   * Get the profiles of all the instances that are following the instance.
+   *
+   * @return
+   */
+  public AsyncResult<List<Profile>> getFollowedBy() {
+
+    return getOwnIdentity().then(ownIdentity -> getHops().then(hops -> {
+      Set<String> identities = hops.keySet();
+
+      List<AsyncResult<IsFollowingResponse>> results =
+          identities.stream().map((ident) -> isFollowing(ident, ownIdentity)).collect(Collectors.toList());
+
+      AsyncResult<List<IsFollowingResponse>> allResults = AsyncResult.combine(results);
+
+      AsyncResult<List<String>> ids = allResults.thenApply(
+          queryResults -> queryResults
+              .stream()
+              .filter(result -> result.isFollowing())
+              .map(IsFollowingResponse::getSource)
+              .collect(Collectors.toList()));
+
+      return ids.then(this::getProfiles);
+    }));
+  }
+
+  /**
+   * Get the profiles of all the users that the instance is following that also follow the instance.
+   *
+   * @return
+   */
+  public AsyncResult<List<Profile>> getFriends() {
+
+    return getOwnIdentity().then(ident -> getFollowing().then(following -> {
+
+      List<AsyncResult<IsFollowingResponse>> responses =
+          following.stream().map((follow) -> isFollowing(follow.getKey(), ident)).collect(Collectors.toList());
+
+      return AsyncResult.combine(responses).then(response -> {
+        List<AsyncResult<Profile>> profiles =
+            response.stream().filter(f -> f.isFollowing()).map(item -> getProfile(item.getSource())).collect(
+                Collectors.toList());
+
+        return AsyncResult.combine(profiles);
+      });
+
+    }));
+
+  }
+
+  /**
+   * Gets the profile of a given user
+   *
+   * @param publicKey the public key of the user to get the profile of
+   * @return
+   */
+  public AsyncResult<Profile> getProfile(String publicKey) {
+
+    RPCFunction function = new RPCFunction(Arrays.asList("about"), "latestValues");
+    AboutQuery query = new AboutQuery(publicKey, Arrays.asList("name"));
+    RPCAsyncRequest rpcAsyncRequest = new RPCAsyncRequest(function, Arrays.asList(query));
+
+    try {
+      AsyncResult<RPCResponse> rpcResponseAsyncResult = multiplexer.makeAsyncRequest(rpcAsyncRequest);
+
+      return rpcResponseAsyncResult.then(rpcResponse -> {
+        try {
+          AboutQueryResponse aboutQueryResponse = rpcResponse.asJSON(mapper, AboutQueryResponse.class);
+          return AsyncResult.completed(new Profile(publicKey, aboutQueryResponse.getName()));
+        } catch (IOException e) {
+          return AsyncResult.exceptional(e);
+        }
+      });
+
+    } catch (JsonProcessingException e) {
+      return AsyncResult.exceptional(e);
+    }
+  }
+
+  /**
+   * Set the display name of the instance by posting an 'about' message to the feed.
+   *
+   * @param displayName the instance's new display name
+   * @return the new profile after setting the display name
+   */
+  public AsyncResult<Profile> setDisplayName(String displayName) {
+
+    return getOwnIdentity().then(ownId -> {
+      try {
+        return feedService.publish(new UpdateNameMessage(displayName, ownId)).then(feedMessage -> getProfile(ownId));
+      } catch (JsonProcessingException e) {
+        return AsyncResult.exceptional(e);
+      }
+    });
+  }
+
+  /**
+   * A map of all the instance IDs to how many hops away they are in the social graph.
+   *
+   * @return
+   */
+  private AsyncResult<Map<String, Integer>> getHops() {
+    RPCFunction rpcFunction = new RPCFunction(Arrays.asList("friends"), "hops");
+    RPCAsyncRequest rpcAsyncRequest = new RPCAsyncRequest(rpcFunction, Arrays.asList());
+
+    try {
+      AsyncResult<RPCResponse> rpcResponseAsyncResult = multiplexer.makeAsyncRequest(rpcAsyncRequest);
+
+      return rpcResponseAsyncResult.then(rpcResponse -> {
+
+        try {
+          Map<String, Integer> followHops = rpcResponse.asJSON(mapper, new TypeReference<Map<String, Integer>>() {});
+          return AsyncResult.completed(followHops);
+        } catch (IOException e) {
+          return AsyncResult.exceptional(e);
+        }
+      });
+
+    } catch (JsonProcessingException e) {
+      return AsyncResult.exceptional(e);
+    }
+  }
+
+  /**
+   * Queries whether a given user (source) is following another given user (destination)
+   *
+   * @param source the node we want to check is following the destination
+   * @param destination the destination node
+   * @return
+   */
+  private AsyncResult<IsFollowingResponse> isFollowing(String source, String destination) {
+    RPCFunction function = new RPCFunction(Arrays.asList("friends"), "isFollowing");
+
+    RPCAsyncRequest rpcAsyncRequest =
+        new RPCAsyncRequest(function, Arrays.asList(new IsFollowingQuery(source, destination)));
+
+    try {
+      return multiplexer.makeAsyncRequest(rpcAsyncRequest).then(rpcResponse -> {
+        try {
+          boolean answer = rpcResponse.asJSON(mapper, Boolean.class);
+          return AsyncResult.completed(new IsFollowingResponse(source, destination, answer));
+        } catch (IOException e) {
+          return AsyncResult.exceptional(e);
+        }
+      });
+    } catch (JsonProcessingException e) {
+      return AsyncResult.exceptional(e);
+    }
+  }
+
+  /**
+   * Fetches the profiles of the given list of users.
+   *
+   * @param keys the users to get the profiles of
+   * @return
+   */
+  private AsyncResult<List<Profile>> getProfiles(List<String> keys) {
+
+    List<AsyncResult<Profile>> asyncResultStream = keys.stream().map(this::getProfile).collect(Collectors.toList());
+
+    return AsyncResult.combine(asyncResultStream);
+  }
+
+}
diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/Peer.java
similarity index 52%
copy from scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
copy to scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/Peer.java
index fce546a..78e8535 100644
--- a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/Peer.java
@@ -12,28 +12,46 @@
  */
 package org.apache.tuweni.scuttlebutt.lib.model;
 
+public class Peer {
 
-public interface StreamHandler<T> {
+  private String state;
+  private String address;
+  private int port;
+  private String key;
 
-  /**
-   * Handles a new item from the result stream.
-   *
-   * @param item
-   */
-  void onMessage(T item);
+  public Peer() {}
 
   /**
-   * Invoked when the stream has been closed.
-   */
-  void onStreamEnd();
-
-  /**
-   * Invoked when there is an error in the stream.
    *
-   * @param ex the underlying error
+   * @param address the address of the peer used to connect to it
+   * @param port the port of the peer
+   * @param key the public key of the peer
+   * @param state the connection state of the peer
    */
-  void onStreamError(Exception ex);
-
-
-
+  public Peer(String address, int port, String key, String state) {
+    this.address = address;
+    this.port = port;
+    this.key = key;
+    this.state = state;
+  }
+
+  public String getAddress() {
+    return address;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public String getState() {
+    if (state == null) {
+      return "unknown";
+    } else {
+      return state;
+    }
+  }
 }
diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/PeerStateChange.java
similarity index 59%
copy from scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
copy to scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/PeerStateChange.java
index fce546a..3aeca8c 100644
--- a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/PeerStateChange.java
@@ -12,28 +12,40 @@
  */
 package org.apache.tuweni.scuttlebutt.lib.model;
 
+public class PeerStateChange {
 
-public interface StreamHandler<T> {
+  private Peer peer;
+  private String type;
 
-  /**
-   * Handles a new item from the result stream.
-   *
-   * @param item
-   */
-  void onMessage(T item);
-
-  /**
-   * Invoked when the stream has been closed.
+  /*
+    * Empty constructor for serialization
    */
-  void onStreamEnd();
+  public PeerStateChange() {}
 
   /**
-   * Invoked when there is an error in the stream.
+   * A change in peer state.
    *
-   * @param ex the underlying error
+   * @param type the state change since the previous state
+   * @param peer the new peer details
    */
-  void onStreamError(Exception ex);
-
-
-
+  public PeerStateChange(String type, Peer peer) {
+    this.type = type;
+    this.peer = peer;
+  }
+
+  public Peer getPeer() {
+    return peer;
+  }
+
+  public void setPeer(Peer peer) {
+    this.peer = peer;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
 }
diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/Profile.java
similarity index 65%
copy from scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
copy to scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/Profile.java
index fce546a..dad41e4 100644
--- a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/Profile.java
@@ -12,28 +12,33 @@
  */
 package org.apache.tuweni.scuttlebutt.lib.model;
 
+/**
+ * Represents a user profile.
+ */
+public class Profile {
 
-public interface StreamHandler<T> {
+  private String key;
+  private String displayName;
 
-  /**
-   * Handles a new item from the result stream.
-   *
-   * @param item
-   */
-  void onMessage(T item);
+  public Profile() {
 
-  /**
-   * Invoked when the stream has been closed.
-   */
-  void onStreamEnd();
+  }
 
   /**
-   * Invoked when there is an error in the stream.
    *
-   * @param ex the underlying error
+   * @param key the public key of the user
+   * @param displayName the display name of the user
    */
-  void onStreamError(Exception ex);
-
-
-
+  public Profile(String key, String displayName) {
+    this.key = key;
+    this.displayName = displayName;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public String getDisplayName() {
+    return displayName;
+  }
 }
diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
index fce546a..da874a7 100644
--- a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
@@ -33,7 +33,4 @@ public interface StreamHandler<T> {
    * @param ex the underlying error
    */
   void onStreamError(Exception ex);
-
-
-
 }
diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/UpdateNameMessage.java
similarity index 56%
copy from scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
copy to scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/UpdateNameMessage.java
index fce546a..46b3760 100644
--- a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/UpdateNameMessage.java
@@ -12,28 +12,37 @@
  */
 package org.apache.tuweni.scuttlebutt.lib.model;
 
+/**
+ * A message that when persisted to the feed updates the name of the given user
+ */
+public class UpdateNameMessage implements ScuttlebuttMessageContent {
 
-public interface StreamHandler<T> {
-
-  /**
-   * Handles a new item from the result stream.
-   *
-   * @param item
-   */
-  void onMessage(T item);
+  private String about;
+  public String type = "about";
+  public String name = "name";
 
-  /**
-   * Invoked when the stream has been closed.
-   */
-  void onStreamEnd();
+  public UpdateNameMessage() {}
 
   /**
-   * Invoked when there is an error in the stream.
    *
-   * @param ex the underlying error
+   * @param name the new name for the user
+   * @param about the public key that the new name should be applied to
    */
-  void onStreamError(Exception ex);
-
-
-
+  public UpdateNameMessage(String name, String about) {
+    this.name = name;
+    this.about = about;
+  }
+
+  public String getAbout() {
+    return about;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public String getType() {
+    return type;
+  }
 }
diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/AboutQuery.java
similarity index 58%
copy from scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
copy to scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/AboutQuery.java
index fce546a..798894a 100644
--- a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/AboutQuery.java
@@ -10,30 +10,31 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package org.apache.tuweni.scuttlebutt.lib.model;
+package org.apache.tuweni.scuttlebutt.lib.model.query;
 
+import java.util.List;
 
-public interface StreamHandler<T> {
+public class AboutQuery {
 
-  /**
-   * Handles a new item from the result stream.
-   *
-   * @param item
-   */
-  void onMessage(T item);
+  private List<String> keys;
+  private String dest;
 
-  /**
-   * Invoked when the stream has been closed.
-   */
-  void onStreamEnd();
+  public AboutQuery() {}
 
   /**
-   * Invoked when there is an error in the stream.
-   *
-   * @param ex the underlying error
+   * @param dest the object that the 'about' type message refers to (e.g. user profile / message ID.)
+   * @param keys The keys for the 'about' type messages that we're querying
    */
-  void onStreamError(Exception ex);
-
-
-
+  public AboutQuery(String dest, List<String> keys) {
+    this.keys = keys;
+    this.dest = dest;
+  }
+
+  public List<String> getKeys() {
+    return keys;
+  }
+
+  public String getDest() {
+    return dest;
+  }
 }
diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/AboutQueryResponse.java
similarity index 66%
copy from scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
copy to scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/AboutQueryResponse.java
index fce546a..4827cf2 100644
--- a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/AboutQueryResponse.java
@@ -10,30 +10,26 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package org.apache.tuweni.scuttlebutt.lib.model;
+package org.apache.tuweni.scuttlebutt.lib.model.query;
 
+/**
+ * A response to querying for the profile details of a user.
+ */
+public class AboutQueryResponse {
 
-public interface StreamHandler<T> {
+  private String name;
 
-  /**
-   * Handles a new item from the result stream.
-   *
-   * @param item
-   */
-  void onMessage(T item);
+  public AboutQueryResponse() {}
 
   /**
-   * Invoked when the stream has been closed.
-   */
-  void onStreamEnd();
-
-  /**
-   * Invoked when there is an error in the stream.
    *
-   * @param ex the underlying error
+   * @param name the display name of the user
    */
-  void onStreamError(Exception ex);
-
-
+  public AboutQueryResponse(String name) {
+    this.name = name;
+  }
 
+  public String getName() {
+    return name;
+  }
 }
diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/IsFollowingQuery.java
similarity index 60%
copy from scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
copy to scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/IsFollowingQuery.java
index fce546a..96e793e 100644
--- a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/IsFollowingQuery.java
@@ -10,30 +10,31 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package org.apache.tuweni.scuttlebutt.lib.model;
+package org.apache.tuweni.scuttlebutt.lib.model.query;
 
+public class IsFollowingQuery {
 
-public interface StreamHandler<T> {
+  private String source;
+  private String dest;
 
-  /**
-   * Handles a new item from the result stream.
-   *
-   * @param item
-   */
-  void onMessage(T item);
+  public IsFollowingQuery() {}
 
   /**
-   * Invoked when the stream has been closed.
-   */
-  void onStreamEnd();
-
-  /**
-   * Invoked when there is an error in the stream.
+   * A query body to check whether the 'source' is following the 'destination' user.
    *
-   * @param ex the underlying error
+   * @param source the source to check
+   * @param dest the target node
    */
-  void onStreamError(Exception ex);
-
-
-
+  public IsFollowingQuery(String source, String dest) {
+    this.source = source;
+    this.dest = dest;
+  }
+
+  public String getSource() {
+    return source;
+  }
+
+  public String getDest() {
+    return dest;
+  }
 }
diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/IsFollowingResponse.java
similarity index 50%
copy from scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
copy to scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/IsFollowingResponse.java
index fce546a..95f2c0b 100644
--- a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/IsFollowingResponse.java
@@ -10,30 +10,38 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package org.apache.tuweni.scuttlebutt.lib.model;
+package org.apache.tuweni.scuttlebutt.lib.model.query;
 
+public class IsFollowingResponse {
 
-public interface StreamHandler<T> {
+  private String destination;
+  private String source;
+  private boolean following;
 
-  /**
-   * Handles a new item from the result stream.
-   *
-   * @param item
-   */
-  void onMessage(T item);
+  public IsFollowingResponse() {}
 
   /**
-   * Invoked when the stream has been closed.
-   */
-  void onStreamEnd();
-
-  /**
-   * Invoked when there is an error in the stream.
+   * A response to a query on whether 'source' is following 'destination'
    *
-   * @param ex the underlying error
+   * @param source the source node
+   * @param destination the destination node
+   * @param following true if source is following destination, false otherwise
    */
-  void onStreamError(Exception ex);
-
-
-
+  public IsFollowingResponse(String source, String destination, boolean following) {
+    this.source = source;
+    this.destination = destination;
+    this.following = following;
+  }
+
+  public String getDestination() {
+    return destination;
+  }
+
+  public String getSource() {
+    return source;
+  }
+
+  public boolean isFollowing() {
+    return following;
+  }
 }
diff --git a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/WhoAmIResponse.java
similarity index 67%
copy from scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
copy to scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/WhoAmIResponse.java
index fce546a..407cb34 100644
--- a/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/StreamHandler.java
+++ b/scuttlebutt-client-lib/src/main/java/org/apache/tuweni/scuttlebutt/lib/model/query/WhoAmIResponse.java
@@ -10,30 +10,25 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package org.apache.tuweni.scuttlebutt.lib.model;
+package org.apache.tuweni.scuttlebutt.lib.model.query;
 
+public class WhoAmIResponse {
 
-public interface StreamHandler<T> {
+  private String id;
 
-  /**
-   * Handles a new item from the result stream.
-   *
-   * @param item
-   */
-  void onMessage(T item);
+  public WhoAmIResponse() {
 
-  /**
-   * Invoked when the stream has been closed.
-   */
-  void onStreamEnd();
+  }
 
   /**
-   * Invoked when there is an error in the stream.
    *
-   * @param ex the underlying error
+   * @param id the ID of the current user in response to a 'whoami' query
    */
-  void onStreamError(Exception ex);
-
-
+  public WhoAmIResponse(String id) {
+    this.id = id;
+  }
 
+  public String getId() {
+    return id;
+  }
 }
diff --git a/scuttlebutt-client-lib/src/test/java/org/apache/tuweni/scuttlebutt/lib/FeedStreamTest.java b/scuttlebutt-client-lib/src/test/java/org/apache/tuweni/scuttlebutt/lib/FeedStreamTest.java
index 9785800..a66edde 100644
--- a/scuttlebutt-client-lib/src/test/java/org/apache/tuweni/scuttlebutt/lib/FeedStreamTest.java
+++ b/scuttlebutt-client-lib/src/test/java/org/apache/tuweni/scuttlebutt/lib/FeedStreamTest.java
@@ -66,7 +66,7 @@ public class FeedStreamTest {
 
     CompletableAsyncResult<Optional<FeedMessage>> lastMessage = AsyncResult.incomplete();
 
-    feedService.createFeedStream(new StreamHandler<FeedMessage>() {
+    feedService.createFeedStream((closer) -> new StreamHandler<FeedMessage>() {
 
       Optional<FeedMessage> currentMessage = Optional.empty();
 
diff --git a/scuttlebutt-client-lib/src/test/java/org/apache/tuweni/scuttlebutt/lib/PeerInfoTest.java b/scuttlebutt-client-lib/src/test/java/org/apache/tuweni/scuttlebutt/lib/PeerInfoTest.java
new file mode 100644
index 0000000..8550ad6
--- /dev/null
+++ b/scuttlebutt-client-lib/src/test/java/org/apache/tuweni/scuttlebutt/lib/PeerInfoTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.scuttlebutt.lib;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.tuweni.concurrent.AsyncResult;
+import org.apache.tuweni.concurrent.CompletableAsyncResult;
+import org.apache.tuweni.scuttlebutt.Invite;
+import org.apache.tuweni.scuttlebutt.MalformedInviteCodeException;
+import org.apache.tuweni.scuttlebutt.lib.model.Peer;
+import org.apache.tuweni.scuttlebutt.lib.model.PeerStateChange;
+import org.apache.tuweni.scuttlebutt.lib.model.StreamHandler;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ *
+ */
+public class PeerInfoTest {
+
+  private AsyncResult<Void> peerWithNodeUsingInviteCode(ScuttlebuttClient client) throws MalformedInviteCodeException,
+      Exception {
+    String inviteCode = System.getenv("ssb_invite_code");
+
+    if (inviteCode == null) {
+      return AsyncResult.exceptional(
+          new Exception("Test requires an 'ssb_invite_code environment variable with a valid ssb invite code"));
+    } else {
+      return client.getNetworkService().redeemInviteCode(Invite.fromCanonicalForm(inviteCode));
+    }
+  }
+
+
+  @Test
+  @Disabled("Requires a scuttlebutt backend")
+  public void testPeerStream() throws Exception, MalformedInviteCodeException {
+    TestConfig config = TestConfig.fromEnvironment();
+
+    AsyncResult<ScuttlebuttClient> scuttlebuttClientLibAsyncResult =
+        ScuttlebuttClientFactory.fromNet(new ObjectMapper(), config.getHost(), config.getPort(), config.getKeyPair());
+
+    ScuttlebuttClient scuttlebuttClient = scuttlebuttClientLibAsyncResult.get();
+
+    CompletableAsyncResult<PeerStateChange> incomplete = AsyncResult.incomplete();
+
+    scuttlebuttClient.getNetworkService().createChangesStream((closer) -> new StreamHandler<PeerStateChange>() {
+      @Override
+      public void onMessage(PeerStateChange item) {
+        if (!incomplete.isDone()) {
+          incomplete.complete(item);
+        }
+
+        closer.run();
+      }
+
+      @Override
+      public void onStreamEnd() {
+
+        if (!incomplete.isDone()) {
+          incomplete.completeExceptionally(new Exception("Stream closed before any items were pushed."));
+        }
+
+      }
+
+      @Override
+      public void onStreamError(Exception ex) {
+        incomplete.completeExceptionally(ex);
+      }
+    });
+
+    ScuttlebuttClient client = scuttlebuttClientLibAsyncResult.get();
+
+    AsyncResult<Void> inviteRedeemed = peerWithNodeUsingInviteCode(client);
+
+    try {
+      inviteRedeemed.get();
+    } catch (Exception ex) {
+      // Not fatal, since we may already have peered
+      System.out.println("Exception while redeeming invite code: " + ex.getMessage());
+    }
+
+    PeerStateChange peerStateChange = incomplete.get(10000, TimeUnit.SECONDS);
+
+    String changeType = peerStateChange.getType();
+
+    assertTrue(changeType != null);
+  }
+
+  @Test
+  @Disabled("Requires a scuttlebutt backend")
+  public void getConnectedPeersTest() throws Exception, MalformedInviteCodeException {
+    TestConfig config = TestConfig.fromEnvironment();
+
+    AsyncResult<ScuttlebuttClient> scuttlebuttClientLibAsyncResult =
+        ScuttlebuttClientFactory.fromNet(new ObjectMapper(), config.getHost(), config.getPort(), config.getKeyPair());
+
+    ScuttlebuttClient scuttlebuttClient = scuttlebuttClientLibAsyncResult.get();
+
+    AsyncResult<Void> asyncResult = peerWithNodeUsingInviteCode(scuttlebuttClient);
+
+    try {
+      asyncResult.get();
+    } catch (Exception ex) {
+      // Not fatal, since we may already have peered
+      System.out.println("Exception while redeeming invite code: " + ex.getMessage());
+    }
+
+    AsyncResult<List<Peer>> connectedPeers = scuttlebuttClient.getNetworkService().getConnectedPeers();
+
+    Thread.sleep(10000);
+
+    assertTrue(!connectedPeers.get().isEmpty());
+  }
+
+}
diff --git a/scuttlebutt-client-lib/src/test/java/org/apache/tuweni/scuttlebutt/lib/SocialServiceTest.java b/scuttlebutt-client-lib/src/test/java/org/apache/tuweni/scuttlebutt/lib/SocialServiceTest.java
new file mode 100644
index 0000000..a850d4d
--- /dev/null
+++ b/scuttlebutt-client-lib/src/test/java/org/apache/tuweni/scuttlebutt/lib/SocialServiceTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.scuttlebutt.lib;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.tuweni.concurrent.AsyncResult;
+import org.apache.tuweni.crypto.sodium.Signature;
+import org.apache.tuweni.scuttlebutt.lib.model.Profile;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+public class SocialServiceTest {
+
+  @Test
+  @Disabled("Requires an ssb instance")
+  public void testViewFollowingWithPatchwork() throws Exception {
+
+    Signature.KeyPair localKeys = KeyFileLoader.getLocalKeys();
+
+    AsyncResult<ScuttlebuttClient> scuttlebuttClientLibAsyncResult =
+        ScuttlebuttClientFactory.fromNet(new ObjectMapper(), "localhost", 8008, localKeys);
+
+    ScuttlebuttClient scuttlebuttClient = scuttlebuttClientLibAsyncResult.get();
+
+    AsyncResult<List<Profile>> following = scuttlebuttClient.getSocialService().getFollowing();
+
+    List<Profile> profiles = following.get(10, TimeUnit.SECONDS);
+
+    assertTrue(!profiles.isEmpty());
+
+    profiles.stream().forEach(profile -> System.out.println(profile.getDisplayName()));
+
+    System.out.println("Following: " + profiles.size());
+  }
+
+  @Test
+  @Disabled("Requires an ssb instance")
+  public void testViewFollowedByPatchwork() throws Exception {
+    Signature.KeyPair localKeys = KeyFileLoader.getLocalKeys();
+
+    AsyncResult<ScuttlebuttClient> scuttlebuttClientLibAsyncResult =
+        ScuttlebuttClientFactory.fromNet(new ObjectMapper(), "localhost", 8008, localKeys);
+
+    ScuttlebuttClient scuttlebuttClient = scuttlebuttClientLibAsyncResult.get();
+
+    AsyncResult<List<Profile>> followedBy = scuttlebuttClient.getSocialService().getFollowedBy();
+
+    List<Profile> profiles = followedBy.get(30, TimeUnit.SECONDS);
+
+    assertTrue(!profiles.isEmpty());
+
+    profiles.stream().forEach(profile -> System.out.println(profile.getDisplayName()));
+
+    System.out.println("Followed by: " + profiles.size());
+  }
+
+  @Test
+  @Disabled("Requires an ssb instance")
+  public void testFriendsWithPatchwork() throws Exception {
+    Signature.KeyPair localKeys = KeyFileLoader.getLocalKeys();
+
+    AsyncResult<ScuttlebuttClient> scuttlebuttClientLibAsyncResult =
+        ScuttlebuttClientFactory.fromNet(new ObjectMapper(), "localhost", 8008, localKeys);
+
+    ScuttlebuttClient scuttlebuttClient = scuttlebuttClientLibAsyncResult.get();
+
+    AsyncResult<List<Profile>> friends = scuttlebuttClient.getSocialService().getFriends();
+
+    List<Profile> profiles = friends.get(30, TimeUnit.SECONDS);
+
+    assertTrue(!profiles.isEmpty());
+
+    profiles.stream().forEach(profile -> System.out.println(profile.getDisplayName()));
+
+    System.out.println("Friends: " + profiles.size());
+  }
+
+  @Test
+  @Disabled("Requires an ssb instance")
+  public void testSetDisplayName() throws Exception {
+
+    TestConfig testConfig = TestConfig.fromEnvironment();
+
+    AsyncResult<ScuttlebuttClient> scuttlebuttClientLibAsyncResult = ScuttlebuttClientFactory
+        .fromNet(new ObjectMapper(), testConfig.getHost(), testConfig.getPort(), testConfig.getKeyPair());
+
+    ScuttlebuttClient scuttlebuttClient = scuttlebuttClientLibAsyncResult.get();
+
+    SocialService socialService = scuttlebuttClient.getSocialService();
+
+    String newDisplayName = "Test display name";
+
+    Profile profile = socialService.setDisplayName(newDisplayName).get();
+
+    assertEquals(profile.getDisplayName(), newDisplayName);
+
+    Profile ownProfile = socialService.getOwnProfile().get();
+
+    assertEquals(ownProfile.getDisplayName(), newDisplayName);
+  }
+
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCResponse.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCResponse.java
index 73c6e86..a5bb494 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCResponse.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCResponse.java
@@ -19,6 +19,7 @@ import org.apache.tuweni.scuttlebutt.rpc.RPCFlag.BodyType;
 
 import java.io.IOException;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 /**
@@ -77,4 +78,9 @@ public class RPCResponse {
     return objectMapper.readerFor(clazz).readValue(body().toArrayUnsafe());
   }
 
+  public <T> T asJSON(ObjectMapper objectMapper, TypeReference<T> typeReference) throws IOException {
+    return objectMapper.readerFor(typeReference).readValue(body().toArrayUnsafe());
+  }
+
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org