You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gossip.apache.org by ec...@apache.org on 2017/02/12 02:38:10 UTC

incubator-gossip git commit: GOSSIP-52 use one object mapper application wide

Repository: incubator-gossip
Updated Branches:
  refs/heads/master 296b55fa9 -> 21a263b07


GOSSIP-52 use one object mapper application wide


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/21a263b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/21a263b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/21a263b0

Branch: refs/heads/master
Commit: 21a263b079f57b1bc343c1f5191201f4c1b73b89
Parents: 296b55f
Author: Edward Capriolo <ed...@gmail.com>
Authored: Wed Feb 8 17:58:42 2017 -0500
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Wed Feb 8 17:58:42 2017 -0500

----------------------------------------------------------------------
 .../org/apache/gossip/manager/GossipCore.java   | 13 ++-------
 .../apache/gossip/manager/GossipManager.java    |  9 +++++-
 .../gossip/manager/PassiveGossipThread.java     | 29 ++++++--------------
 .../gossip/manager/RingStatePersister.java      | 11 ++------
 .../gossip/manager/UserDataPersister.java       | 10 +++----
 .../manager/random/RandomGossipManager.java     | 18 ++++++++++--
 6 files changed, 41 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/21a263b0/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index 82d65fe..e23ee54 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -56,13 +56,10 @@ import org.apache.log4j.Logger;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 
 public class GossipCore implements GossipCoreConstants {
   
   public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
-  private static final ObjectMapper MAPPER = new ObjectMapper();
   private final GossipManager gossipManager;
   private ConcurrentHashMap<String, Base> requests;
   private ThreadPoolExecutor service;
@@ -72,11 +69,7 @@ public class GossipCore implements GossipCoreConstants {
   private final Meter messageSerdeException;
   private final Meter tranmissionException;
   private final Meter tranmissionSuccess;
-  
-  {
-    MAPPER.enableDefaultTyping();
-  }
-  
+    
   public GossipCore(GossipManager manager, MetricRegistry metrics){
     this.gossipManager = manager;
     requests = new ConcurrentHashMap<>();
@@ -214,7 +207,7 @@ public class GossipCore implements GossipCoreConstants {
   private void sendInternal(Base message, URI uri){
     byte[] json_bytes;
     try {
-      json_bytes = MAPPER.writeValueAsString(message).getBytes();
+      json_bytes = gossipManager.getObjectMapper().writeValueAsString(message).getBytes();
     } catch (IOException e) {
       messageSerdeException.mark();
       throw new RuntimeException(e);
@@ -292,7 +285,7 @@ public class GossipCore implements GossipCoreConstants {
   public void sendOneWay(Base message, URI u){
     byte[] json_bytes;
     try {
-      json_bytes = MAPPER.writeValueAsBytes(message);
+      json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
     } catch (IOException e) {
       messageSerdeException.mark();
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/21a263b0/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 53ed8c7..67cb06b 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -18,6 +18,7 @@
 package org.apache.gossip.manager;
 
 import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -69,10 +70,11 @@ public abstract class GossipManager {
   private final MetricRegistry registry;
   private final RingStatePersister ringState;
   private final UserDataPersister userDataState;
+  private final ObjectMapper objectMapper;
   
   public GossipManager(String cluster,
           URI uri, String id, Map<String,String> properties, GossipSettings settings,
-          List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
+          List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) {
     this.settings = settings;
     gossipCore = new GossipCore(this, registry);
     clock = new SystemClock();
@@ -97,6 +99,7 @@ public abstract class GossipManager {
     this.registry = registry;
     this.ringState = new RingStatePersister(this);
     this.userDataState = new UserDataPersister(this, this.gossipCore);
+    this.objectMapper = objectMapper;
     readSavedRingState();
     readSavedDataState();
   }
@@ -330,5 +333,9 @@ public abstract class GossipManager {
   public Clock getClock() {
     return clock;
   }
+
+  public ObjectMapper getObjectMapper() {
+    return objectMapper;
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/21a263b0/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index 47b8a8f..51cf264 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -28,13 +28,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.gossip.model.Base;
 import org.apache.log4j.Logger;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 /**
- * [The passive thread: reply to incoming gossip request.] This class handles the passive cycle,
- * where this client has received an incoming message. For now, this message is always the
- * membership list, but if you choose to gossip additional information, you will need some logic to
- * determine the incoming message.
+ * This class handles the passive cycle,
+ * where this client has received an incoming message. 
  */
 abstract public class PassiveGossipThread implements Runnable {
 
@@ -42,21 +38,16 @@ abstract public class PassiveGossipThread implements Runnable {
 
   /** The socket used for the passive thread of the gossip service. */
   private final DatagramSocket server;
-
   private final AtomicBoolean keepRunning;
-
-  private final String cluster;
-  
-  private final static ObjectMapper MAPPER = new ObjectMapper();
-  
   private final GossipCore gossipCore;
-  
-  {
-    MAPPER.enableDefaultTyping();
-  }
+  private final GossipManager gossipManager;
 
   public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
+    this.gossipManager = gossipManager;
     this.gossipCore = gossipCore;
+    if (gossipManager.getMyself().getClusterName() == null){
+      throw new IllegalArgumentException("Cluster was null");
+    }
     try {
       SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
               gossipManager.getMyself().getUri().getPort());
@@ -64,10 +55,6 @@ abstract public class PassiveGossipThread implements Runnable {
       LOGGER.debug("Gossip service successfully initialized on port "
               + gossipManager.getMyself().getUri().getPort());
       LOGGER.debug("I am " + gossipManager.getMyself());
-      cluster = gossipManager.getMyself().getClusterName();
-      if (cluster == null){
-        throw new IllegalArgumentException("cluster was null");
-      }
     } catch (SocketException ex) {
       LOGGER.warn(ex);
       throw new RuntimeException(ex);
@@ -84,7 +71,7 @@ abstract public class PassiveGossipThread implements Runnable {
         server.receive(p);
         debug(p.getData());
         try {
-          Base activeGossipMessage = MAPPER.readValue(p.getData(), Base.class);
+          Base activeGossipMessage = gossipManager.getObjectMapper().readValue(p.getData(), Base.class);
           gossipCore.receive(activeGossipMessage);
         } catch (RuntimeException ex) {//TODO trap json exception
           LOGGER.error("Unable to process message", ex);

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/21a263b0/src/main/java/org/apache/gossip/manager/RingStatePersister.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/src/main/java/org/apache/gossip/manager/RingStatePersister.java
index 24b464a..6f724e0 100644
--- a/src/main/java/org/apache/gossip/manager/RingStatePersister.java
+++ b/src/main/java/org/apache/gossip/manager/RingStatePersister.java
@@ -29,15 +29,9 @@ import java.util.NavigableSet;
 import org.apache.gossip.LocalGossipMember;
 import org.apache.log4j.Logger;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 public class RingStatePersister implements Runnable {
 
   private static final Logger LOGGER = Logger.getLogger(RingStatePersister.class);
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-  private static final TypeReference<ArrayList<LocalGossipMember>> REF 
-    = new TypeReference<ArrayList<LocalGossipMember>>() { };
   private GossipManager parent;
   
   public RingStatePersister(GossipManager parent){
@@ -60,18 +54,19 @@ public class RingStatePersister implements Runnable {
     }
     NavigableSet<LocalGossipMember> i = parent.getMembers().keySet();
     try (FileOutputStream fos = new FileOutputStream(computeTarget())){
-      MAPPER.writeValue(fos, i);
+      parent.getObjectMapper().writeValue(fos, i);
     } catch (IOException e) {
       LOGGER.debug(e);
     }
   }
 
+  @SuppressWarnings("unchecked")
   List<LocalGossipMember> readFromDisk(){
     if (!parent.getSettings().isPersistRingState()){
       return Collections.emptyList();
     }
     try (FileInputStream fos = new FileInputStream(computeTarget())){
-      return MAPPER.readValue(fos, REF);
+      return parent.getObjectMapper().readValue(fos, ArrayList.class);
     } catch (IOException e) {
       LOGGER.debug(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/21a263b0/src/main/java/org/apache/gossip/manager/UserDataPersister.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/UserDataPersister.java b/src/main/java/org/apache/gossip/manager/UserDataPersister.java
index c67677a..2a123e3 100644
--- a/src/main/java/org/apache/gossip/manager/UserDataPersister.java
+++ b/src/main/java/org/apache/gossip/manager/UserDataPersister.java
@@ -32,14 +32,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 public class UserDataPersister implements Runnable {
   
   private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class);
-  private static final ObjectMapper MAPPER = new ObjectMapper();
   private final GossipManager parent;
   private final GossipCore gossipCore; 
   
   UserDataPersister(GossipManager parent, GossipCore gossipCore){
     this.parent = parent;
     this.gossipCore = gossipCore;
-    MAPPER.enableDefaultTyping();
   }
   
   File computeSharedTarget(){
@@ -58,7 +56,7 @@ public class UserDataPersister implements Runnable {
       return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>();
     }
     try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){
-      return MAPPER.readValue(fos, ConcurrentHashMap.class);
+      return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
     } catch (IOException e) {
       LOGGER.debug(e);
     }
@@ -70,7 +68,7 @@ public class UserDataPersister implements Runnable {
       return;
     }
     try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){
-      MAPPER.writeValue(fos, gossipCore.getPerNodeData());
+      parent.getObjectMapper().writeValue(fos, gossipCore.getPerNodeData());
     } catch (IOException e) {
       LOGGER.warn(e);
     }
@@ -81,7 +79,7 @@ public class UserDataPersister implements Runnable {
       return;
     }
     try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){
-      MAPPER.writeValue(fos, gossipCore.getSharedData());
+      parent.getObjectMapper().writeValue(fos, gossipCore.getSharedData());
     } catch (IOException e) {
       LOGGER.warn(e);
     }
@@ -93,7 +91,7 @@ public class UserDataPersister implements Runnable {
       return new ConcurrentHashMap<String, SharedGossipDataMessage>();
     }
     try (FileInputStream fos = new FileInputStream(computeSharedTarget())){
-      return MAPPER.readValue(fos, ConcurrentHashMap.class);
+      return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
     } catch (IOException e) {
       LOGGER.debug(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/21a263b0/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
index 4a150be..00e3378 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -18,6 +18,8 @@
 package org.apache.gossip.manager.random;
 
 import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.apache.gossip.GossipMember;
 import org.apache.gossip.GossipSettings;
 import org.apache.gossip.event.GossipListener;
@@ -44,6 +46,7 @@ public class RandomGossipManager extends GossipManager {
     private GossipListener listener;
     private MetricRegistry registry;
     private Map<String,String> properties;
+    private ObjectMapper objectMapper;
 
     private ManagerBuilder() {}
 
@@ -93,6 +96,11 @@ public class RandomGossipManager extends GossipManager {
       return this;
     }
     
+    public ManagerBuilder mapper(ObjectMapper objectMapper){
+      this.objectMapper = objectMapper;
+      return this;
+    }
+    
     public RandomGossipManager build() {
       checkArgument(id != null, "You must specify an id");
       checkArgument(cluster != null, "You must specify a cluster name");
@@ -108,12 +116,16 @@ public class RandomGossipManager extends GossipManager {
       if (gossipMembers == null) {
         gossipMembers = new ArrayList<>();
       }
-      return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry);
+      if (objectMapper == null) {
+        objectMapper = new ObjectMapper();
+        objectMapper.enableDefaultTyping();
+      }
+      return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper);
     }
   }
 
   private RandomGossipManager(String cluster, URI uri, String id, Map<String,String> properties,  GossipSettings settings, 
-          List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
-    super(cluster, uri, id, properties, settings, gossipMembers, listener, registry);
+          List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) {
+    super(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper);
   }
 }