You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2017/10/26 13:39:55 UTC

svn commit: r1813400 - in /jackrabbit/oak/trunk/oak-segment-tar/src: main/java/org/apache/jackrabbit/oak/segment/standby/store/ test/java/org/apache/jackrabbit/oak/segment/standby/store/

Author: frm
Date: Thu Oct 26 13:39:55 2017
New Revision: 1813400

URL: http://svn.apache.org/viewvc?rev=1813400&view=rev
Log:
OAK-5521 - Make CommunicationObserver and CommunicationPartnerMBean thread safe

Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java?rev=1813400&r1=1813399&r2=1813400&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java Thu Oct 26 13:39:55 2017
@@ -20,115 +20,170 @@
 package org.apache.jackrabbit.oak.segment.standby.store;
 
 import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
+import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
 import org.apache.jackrabbit.oak.segment.standby.jmx.ObservablePartnerMBean;
+import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CommunicationObserver {
 
-    static final int MAX_CLIENT_STATISTICS = 10;
+    private static final int DEFAULT_MAX_CLIENT_MBEANS = 10;
 
     private static final Logger log = LoggerFactory.getLogger(CommunicationObserver.class);
 
-    private final Map<String, CommunicationPartnerMBean> partnerDetails = new HashMap<>();
+    private static ObjectName getMBeanName(CommunicationPartnerMBean bean) throws MalformedObjectNameException {
+        return new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=\"Client " + bean.getName() + "\"");
+    }
+
+    private static String oldest(Map<String, CommunicationPartnerMBean> beans) {
+        CommunicationPartnerMBean oldest = null;
+
+        for (CommunicationPartnerMBean bean : beans.values()) {
+            if (oldest == null || oldest.getLastSeen().after(bean.getLastSeen())) {
+                oldest = bean;
+            }
+        }
+
+        if (oldest == null) {
+            throw new IllegalArgumentException("no clients available");
+        }
+
+        return oldest.getName();
+    }
+
+    private final Map<String, CommunicationPartnerMBean> beans = new HashMap<>();
+
+    private final int maxClientMBeans;
 
     private final String id;
 
     public CommunicationObserver(String id) {
-        this.id = id;
+        this(id, DEFAULT_MAX_CLIENT_MBEANS);
     }
 
-    void unregisterCommunicationPartner(CommunicationPartnerMBean m) throws Exception {
-        ManagementFactory.getPlatformMBeanServer().unregisterMBean(m.getMBeanName());
+    CommunicationObserver(String id, int maxClientMBeans) {
+        this.id = id;
+        this.maxClientMBeans = maxClientMBeans;
     }
 
     void registerCommunicationPartner(CommunicationPartnerMBean m) throws Exception {
-        ManagementFactory.getPlatformMBeanServer().registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), m.getMBeanName());
+        ManagementFactory.getPlatformMBeanServer().registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), getMBeanName(m));
     }
 
-    private void safeUnregisterCommunicationPartner(CommunicationPartnerMBean m) {
+    private void safeRegisterCommunicationPartner(CommunicationPartnerMBean m) {
         try {
-            unregisterCommunicationPartner(m);
+            registerCommunicationPartner(m);
         } catch (Exception e) {
-            log.error(String.format("Unable to unregister MBean for client %s", m.getName()), e);
+            log.error(String.format("Unable to register MBean for client %s", m.getName()), e);
         }
     }
 
-    private void safeRegisterCommunicationPartner(CommunicationPartnerMBean m) {
+    void unregisterCommunicationPartner(CommunicationPartnerMBean m) throws Exception {
+        ManagementFactory.getPlatformMBeanServer().unregisterMBean(getMBeanName(m));
+    }
+
+    private void safeUnregisterCommunicationPartner(CommunicationPartnerMBean m) {
         try {
-            registerCommunicationPartner(m);
+            unregisterCommunicationPartner(m);
         } catch (Exception e) {
-            log.error(String.format("Unable to register MBean for client %s", m.getName()), e);
+            log.error(String.format("Unable to unregister MBean for client %s", m.getName()), e);
         }
     }
 
     public void unregister() {
-        for (CommunicationPartnerMBean m : partnerDetails.values()) {
-            safeUnregisterCommunicationPartner(m);
+        Collection<CommunicationPartnerMBean> unregister;
+
+        synchronized (beans) {
+            unregister = new ArrayList<>(beans.values());
+            beans.clear();
+        }
+
+        for (CommunicationPartnerMBean bean : unregister) {
+            safeUnregisterCommunicationPartner(bean);
         }
     }
 
     public void gotMessageFrom(String client, String request, String address, int port) throws MalformedObjectNameException {
         log.debug("Message '{}' received from client {}", request, client);
-        CommunicationPartnerMBean m = partnerDetails.get(client);
-        boolean register = false;
-        if (m == null) {
-            cleanUp();
-            m = new CommunicationPartnerMBean(client);
-            m.setRemoteAddress(address);
-            m.setRemotePort(port);
-            register = true;
-        }
-        m.setLastSeen(new Date());
-        m.setLastRequest(request);
-        partnerDetails.put(client, m);
-        if (register) {
-            safeRegisterCommunicationPartner(m);
-        }
+        createOrUpdateClientMBean(client, address, port, m -> m.onMessageReceived(new Date(), request));
     }
 
     public void didSendSegmentBytes(String client, int size) {
         log.debug("Segment with size {} sent to client {}", size, client);
-        CommunicationPartnerMBean m = partnerDetails.get(client);
-        m.onSegmentSent(size);
-        partnerDetails.put(client, m);
+        updateClientMBean(client, m -> m.onSegmentSent(size));
     }
 
     public void didSendBinariesBytes(String client, long size) {
         log.debug("Binary with size {} sent to client {}", size, client);
-        CommunicationPartnerMBean m = partnerDetails.get(client);
-        m.onBinarySent(size);
-        partnerDetails.put(client, m);
+        updateClientMBean(client, m -> m.onBinarySent(size));
     }
 
     public String getID() {
         return id;
     }
 
-    private void cleanUp() {
-        while (partnerDetails.size() >= MAX_CLIENT_STATISTICS) {
-            CommunicationPartnerMBean oldestEntry = oldestEntry();
-            log.info("Housekeeping: Removing statistics for client " + oldestEntry.getName());
-            safeUnregisterCommunicationPartner(oldestEntry);
-            partnerDetails.remove(oldestEntry.getName());
+    private void createOrUpdateClientMBean(String clientName, String remoteAddress, int remotePort, Consumer<CommunicationPartnerMBean> update) throws MalformedObjectNameException {
+        List<CommunicationPartnerMBean> unregister = null;
+        boolean register = false;
+        CommunicationPartnerMBean bean;
+
+        synchronized (beans) {
+            bean = beans.get(clientName);
+
+            if (bean == null) {
+                bean = new CommunicationPartnerMBean(clientName, remoteAddress, remotePort);
+
+                while (beans.size() >= maxClientMBeans) {
+                    if (unregister == null) {
+                        unregister = new ArrayList<>();
+                    }
+                    unregister.add(beans.remove(oldest(beans)));
+                }
+
+                beans.put(clientName, bean);
+
+                register = true;
+            }
         }
-    }
 
-    private CommunicationPartnerMBean oldestEntry() {
-        CommunicationPartnerMBean ret = null;
-        for (CommunicationPartnerMBean m : partnerDetails.values()) {
-            if (ret == null || ret.getLastSeen().after(m.getLastSeen())) {
-                ret = m;
+        update.accept(bean);
+
+        if (register) {
+            safeRegisterCommunicationPartner(bean);
+        }
+
+        if (unregister != null) {
+            for (CommunicationPartnerMBean c : unregister) {
+                safeUnregisterCommunicationPartner(c);
             }
         }
-        return ret;
+    }
+
+    private void updateClientMBean(String id, Consumer<CommunicationPartnerMBean> update) {
+        CommunicationPartnerMBean c;
+
+        synchronized (beans) {
+            c = beans.get(id);
+        }
+
+        if (c == null) {
+            throw new IllegalStateException("no client found with id " + id);
+        }
+
+        update.accept(c);
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java?rev=1813400&r1=1813399&r2=1813400&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java Thu Oct 26 13:39:55 2017
@@ -18,45 +18,37 @@
 package org.apache.jackrabbit.oak.segment.standby.store;
 
 import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nonnull;
 import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
 
 import org.apache.jackrabbit.oak.segment.standby.jmx.ObservablePartnerMBean;
-import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
 
 class CommunicationPartnerMBean implements ObservablePartnerMBean {
 
-    private final ObjectName mbeanName;
-
     private final String clientName;
 
-    private String lastRequest;
-
-    private Date lastSeen;
+    private final String remoteAddress;
 
-    private String lastSeenTimestamp;
+    private final int remotePort;
 
-    private String remoteAddress;
+    private final AtomicLong segmentsSent = new AtomicLong();
 
-    private int remotePort;
+    private final AtomicLong segmentBytesSent = new AtomicLong();
 
-    private long segmentsSent;
+    private final AtomicLong binariesSent = new AtomicLong();
 
-    private long segmentBytesSent;
+    private final AtomicLong binariesBytesSent = new AtomicLong();
 
-    private long binariesSent;
+    private volatile String lastRequest;
 
-    private long binariesBytesSent;
+    private volatile Date lastSeen;
 
-    CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException {
+    CommunicationPartnerMBean(String clientName, String remoteAddress, int remotePort) throws MalformedObjectNameException {
         this.clientName = clientName;
-        this.mbeanName = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=\"Client " + clientName + "\"");
-    }
-
-    ObjectName getMBeanName() {
-        return this.mbeanName;
+        this.remoteAddress = remoteAddress;
+        this.remotePort = remotePort;
     }
 
     @Nonnull
@@ -82,58 +74,48 @@ class CommunicationPartnerMBean implemen
 
     @Override
     public String getLastSeenTimestamp() {
-        return this.lastSeenTimestamp;
+        return this.lastSeen.toString();
     }
 
     @Override
     public long getTransferredSegments() {
-        return this.segmentsSent;
+        return this.segmentsSent.get();
     }
 
     @Override
     public long getTransferredSegmentBytes() {
-        return this.segmentBytesSent;
+        return this.segmentBytesSent.get();
     }
 
     @Override
     public long getTransferredBinaries() {
-        return this.binariesSent;
+        return this.binariesSent.get();
     }
 
     @Override
     public long getTransferredBinariesBytes() {
-        return this.binariesBytesSent;
-    }
-
-    void setRemoteAddress(String remoteAddress) {
-        this.remoteAddress = remoteAddress;
-    }
-
-    void setRemotePort(int remotePort) {
-        this.remotePort = remotePort;
-    }
-
-    Date getLastSeen() {
-        return lastSeen;
+        return this.binariesBytesSent.get();
     }
 
-    void setLastSeen(Date lastSeen) {
-        this.lastSeen = lastSeen;
-        this.lastSeenTimestamp = lastSeen.toString();
-    }
-
-    void setLastRequest(String lastRequest) {
-        this.lastRequest = lastRequest;
+    void onMessageReceived(Date lastSeen, String lastRequest) {
+        synchronized (this) {
+            this.lastSeen = lastSeen;
+            this.lastRequest = lastRequest;
+        }
     }
 
     void onSegmentSent(long bytes) {
-        segmentsSent++;
-        segmentBytesSent += bytes;
+        this.segmentsSent.incrementAndGet();
+        this.segmentBytesSent.addAndGet(bytes);
     }
 
     void onBinarySent(long bytes) {
-        binariesSent++;
-        binariesBytesSent += bytes;
+        this.binariesSent.incrementAndGet();
+        this.binariesBytesSent.addAndGet(bytes);
+    }
+
+    Date getLastSeen() {
+        return this.lastSeen;
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java?rev=1813400&r1=1813399&r2=1813400&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java Thu Oct 26 13:39:55 2017
@@ -36,8 +36,8 @@ public class CommunicationObserverTest {
 
         private final List<CommunicationPartnerMBean> communicationPartners = new ArrayList<>();
 
-        TestCommunicationObserver(String id) {
-            super(id);
+        TestCommunicationObserver(String id, int maxClientMBeans) {
+            super(id, maxClientMBeans);
         }
 
         @Override
@@ -58,7 +58,7 @@ public class CommunicationObserverTest {
 
     @Before
     public void before() {
-        observer = new TestCommunicationObserver("test");
+        observer = new TestCommunicationObserver("test", 10);
     }
 
     @After
@@ -80,10 +80,10 @@ public class CommunicationObserverTest {
 
     @Test
     public void shouldNotKeepManyObservablePartnerMBeans() throws Exception {
-        for (int i = 0; i < CommunicationObserver.MAX_CLIENT_STATISTICS * 2; i++) {
+        for (int i = 0; i < 20; i++) {
             observer.gotMessageFrom(randomUUID().toString(), "request", "127.0.0.1", 8080);
         }
-        assertEquals(CommunicationObserver.MAX_CLIENT_STATISTICS, observer.communicationPartners.size());
+        assertEquals(10, observer.communicationPartners.size());
     }
 
     @Test