You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2017/10/26 13:39:55 UTC
svn commit: r1813400 - in /jackrabbit/oak/trunk/oak-segment-tar/src:
main/java/org/apache/jackrabbit/oak/segment/standby/store/
test/java/org/apache/jackrabbit/oak/segment/standby/store/
Author: frm
Date: Thu Oct 26 13:39:55 2017
New Revision: 1813400
URL: http://svn.apache.org/viewvc?rev=1813400&view=rev
Log:
OAK-5521 - Make CommunicationObserver and CommunicationPartnerMBean thread safe
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java?rev=1813400&r1=1813399&r2=1813400&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java Thu Oct 26 13:39:55 2017
@@ -20,115 +20,170 @@
package org.apache.jackrabbit.oak.segment.standby.store;
import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
import javax.management.StandardMBean;
+import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import org.apache.jackrabbit.oak.segment.standby.jmx.ObservablePartnerMBean;
+import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CommunicationObserver {
- static final int MAX_CLIENT_STATISTICS = 10;
+ private static final int DEFAULT_MAX_CLIENT_MBEANS = 10;
private static final Logger log = LoggerFactory.getLogger(CommunicationObserver.class);
- private final Map<String, CommunicationPartnerMBean> partnerDetails = new HashMap<>();
+ private static ObjectName getMBeanName(CommunicationPartnerMBean bean) throws MalformedObjectNameException {
+ return new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=\"Client " + bean.getName() + "\"");
+ }
+
+ private static String oldest(Map<String, CommunicationPartnerMBean> beans) {
+ CommunicationPartnerMBean oldest = null;
+
+ for (CommunicationPartnerMBean bean : beans.values()) {
+ if (oldest == null || oldest.getLastSeen().after(bean.getLastSeen())) {
+ oldest = bean;
+ }
+ }
+
+ if (oldest == null) {
+ throw new IllegalArgumentException("no clients available");
+ }
+
+ return oldest.getName();
+ }
+
+ private final Map<String, CommunicationPartnerMBean> beans = new HashMap<>();
+
+ private final int maxClientMBeans;
private final String id;
public CommunicationObserver(String id) {
- this.id = id;
+ this(id, DEFAULT_MAX_CLIENT_MBEANS);
}
- void unregisterCommunicationPartner(CommunicationPartnerMBean m) throws Exception {
- ManagementFactory.getPlatformMBeanServer().unregisterMBean(m.getMBeanName());
+ CommunicationObserver(String id, int maxClientMBeans) {
+ this.id = id;
+ this.maxClientMBeans = maxClientMBeans;
}
void registerCommunicationPartner(CommunicationPartnerMBean m) throws Exception {
- ManagementFactory.getPlatformMBeanServer().registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), m.getMBeanName());
+ ManagementFactory.getPlatformMBeanServer().registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), getMBeanName(m));
}
- private void safeUnregisterCommunicationPartner(CommunicationPartnerMBean m) {
+ private void safeRegisterCommunicationPartner(CommunicationPartnerMBean m) {
try {
- unregisterCommunicationPartner(m);
+ registerCommunicationPartner(m);
} catch (Exception e) {
- log.error(String.format("Unable to unregister MBean for client %s", m.getName()), e);
+ log.error(String.format("Unable to register MBean for client %s", m.getName()), e);
}
}
- private void safeRegisterCommunicationPartner(CommunicationPartnerMBean m) {
+ void unregisterCommunicationPartner(CommunicationPartnerMBean m) throws Exception {
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(getMBeanName(m));
+ }
+
+ private void safeUnregisterCommunicationPartner(CommunicationPartnerMBean m) {
try {
- registerCommunicationPartner(m);
+ unregisterCommunicationPartner(m);
} catch (Exception e) {
- log.error(String.format("Unable to register MBean for client %s", m.getName()), e);
+ log.error(String.format("Unable to unregister MBean for client %s", m.getName()), e);
}
}
public void unregister() {
- for (CommunicationPartnerMBean m : partnerDetails.values()) {
- safeUnregisterCommunicationPartner(m);
+ Collection<CommunicationPartnerMBean> unregister;
+
+ synchronized (beans) {
+ unregister = new ArrayList<>(beans.values());
+ beans.clear();
+ }
+
+ for (CommunicationPartnerMBean bean : unregister) {
+ safeUnregisterCommunicationPartner(bean);
}
}
public void gotMessageFrom(String client, String request, String address, int port) throws MalformedObjectNameException {
log.debug("Message '{}' received from client {}", request, client);
- CommunicationPartnerMBean m = partnerDetails.get(client);
- boolean register = false;
- if (m == null) {
- cleanUp();
- m = new CommunicationPartnerMBean(client);
- m.setRemoteAddress(address);
- m.setRemotePort(port);
- register = true;
- }
- m.setLastSeen(new Date());
- m.setLastRequest(request);
- partnerDetails.put(client, m);
- if (register) {
- safeRegisterCommunicationPartner(m);
- }
+ createOrUpdateClientMBean(client, address, port, m -> m.onMessageReceived(new Date(), request));
}
public void didSendSegmentBytes(String client, int size) {
log.debug("Segment with size {} sent to client {}", size, client);
- CommunicationPartnerMBean m = partnerDetails.get(client);
- m.onSegmentSent(size);
- partnerDetails.put(client, m);
+ updateClientMBean(client, m -> m.onSegmentSent(size));
}
public void didSendBinariesBytes(String client, long size) {
log.debug("Binary with size {} sent to client {}", size, client);
- CommunicationPartnerMBean m = partnerDetails.get(client);
- m.onBinarySent(size);
- partnerDetails.put(client, m);
+ updateClientMBean(client, m -> m.onBinarySent(size));
}
public String getID() {
return id;
}
- private void cleanUp() {
- while (partnerDetails.size() >= MAX_CLIENT_STATISTICS) {
- CommunicationPartnerMBean oldestEntry = oldestEntry();
- log.info("Housekeeping: Removing statistics for client " + oldestEntry.getName());
- safeUnregisterCommunicationPartner(oldestEntry);
- partnerDetails.remove(oldestEntry.getName());
+ private void createOrUpdateClientMBean(String clientName, String remoteAddress, int remotePort, Consumer<CommunicationPartnerMBean> update) throws MalformedObjectNameException {
+ List<CommunicationPartnerMBean> unregister = null;
+ boolean register = false;
+ CommunicationPartnerMBean bean;
+
+ synchronized (beans) {
+ bean = beans.get(clientName);
+
+ if (bean == null) {
+ bean = new CommunicationPartnerMBean(clientName, remoteAddress, remotePort);
+
+ while (beans.size() >= maxClientMBeans) {
+ if (unregister == null) {
+ unregister = new ArrayList<>();
+ }
+ unregister.add(beans.remove(oldest(beans)));
+ }
+
+ beans.put(clientName, bean);
+
+ register = true;
+ }
}
- }
- private CommunicationPartnerMBean oldestEntry() {
- CommunicationPartnerMBean ret = null;
- for (CommunicationPartnerMBean m : partnerDetails.values()) {
- if (ret == null || ret.getLastSeen().after(m.getLastSeen())) {
- ret = m;
+ update.accept(bean);
+
+ if (register) {
+ safeRegisterCommunicationPartner(bean);
+ }
+
+ if (unregister != null) {
+ for (CommunicationPartnerMBean c : unregister) {
+ safeUnregisterCommunicationPartner(c);
}
}
- return ret;
+ }
+
+ private void updateClientMBean(String id, Consumer<CommunicationPartnerMBean> update) {
+ CommunicationPartnerMBean c;
+
+ synchronized (beans) {
+ c = beans.get(id);
+ }
+
+ if (c == null) {
+ throw new IllegalStateException("no client found with id " + id);
+ }
+
+ update.accept(c);
}
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java?rev=1813400&r1=1813399&r2=1813400&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationPartnerMBean.java Thu Oct 26 13:39:55 2017
@@ -18,45 +18,37 @@
package org.apache.jackrabbit.oak.segment.standby.store;
import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
import org.apache.jackrabbit.oak.segment.standby.jmx.ObservablePartnerMBean;
-import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
class CommunicationPartnerMBean implements ObservablePartnerMBean {
- private final ObjectName mbeanName;
-
private final String clientName;
- private String lastRequest;
-
- private Date lastSeen;
+ private final String remoteAddress;
- private String lastSeenTimestamp;
+ private final int remotePort;
- private String remoteAddress;
+ private final AtomicLong segmentsSent = new AtomicLong();
- private int remotePort;
+ private final AtomicLong segmentBytesSent = new AtomicLong();
- private long segmentsSent;
+ private final AtomicLong binariesSent = new AtomicLong();
- private long segmentBytesSent;
+ private final AtomicLong binariesBytesSent = new AtomicLong();
- private long binariesSent;
+ private volatile String lastRequest;
- private long binariesBytesSent;
+ private volatile Date lastSeen;
- CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException {
+ CommunicationPartnerMBean(String clientName, String remoteAddress, int remotePort) throws MalformedObjectNameException {
this.clientName = clientName;
- this.mbeanName = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=\"Client " + clientName + "\"");
- }
-
- ObjectName getMBeanName() {
- return this.mbeanName;
+ this.remoteAddress = remoteAddress;
+ this.remotePort = remotePort;
}
@Nonnull
@@ -82,58 +74,48 @@ class CommunicationPartnerMBean implemen
@Override
public String getLastSeenTimestamp() {
- return this.lastSeenTimestamp;
+ return this.lastSeen.toString();
}
@Override
public long getTransferredSegments() {
- return this.segmentsSent;
+ return this.segmentsSent.get();
}
@Override
public long getTransferredSegmentBytes() {
- return this.segmentBytesSent;
+ return this.segmentBytesSent.get();
}
@Override
public long getTransferredBinaries() {
- return this.binariesSent;
+ return this.binariesSent.get();
}
@Override
public long getTransferredBinariesBytes() {
- return this.binariesBytesSent;
- }
-
- void setRemoteAddress(String remoteAddress) {
- this.remoteAddress = remoteAddress;
- }
-
- void setRemotePort(int remotePort) {
- this.remotePort = remotePort;
- }
-
- Date getLastSeen() {
- return lastSeen;
+ return this.binariesBytesSent.get();
}
- void setLastSeen(Date lastSeen) {
- this.lastSeen = lastSeen;
- this.lastSeenTimestamp = lastSeen.toString();
- }
-
- void setLastRequest(String lastRequest) {
- this.lastRequest = lastRequest;
+ void onMessageReceived(Date lastSeen, String lastRequest) {
+ synchronized (this) {
+ this.lastSeen = lastSeen;
+ this.lastRequest = lastRequest;
+ }
}
void onSegmentSent(long bytes) {
- segmentsSent++;
- segmentBytesSent += bytes;
+ this.segmentsSent.incrementAndGet();
+ this.segmentBytesSent.addAndGet(bytes);
}
void onBinarySent(long bytes) {
- binariesSent++;
- binariesBytesSent += bytes;
+ this.binariesSent.incrementAndGet();
+ this.binariesBytesSent.addAndGet(bytes);
+ }
+
+ Date getLastSeen() {
+ return this.lastSeen;
}
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java?rev=1813400&r1=1813399&r2=1813400&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserverTest.java Thu Oct 26 13:39:55 2017
@@ -36,8 +36,8 @@ public class CommunicationObserverTest {
private final List<CommunicationPartnerMBean> communicationPartners = new ArrayList<>();
- TestCommunicationObserver(String id) {
- super(id);
+ TestCommunicationObserver(String id, int maxClientMBeans) {
+ super(id, maxClientMBeans);
}
@Override
@@ -58,7 +58,7 @@ public class CommunicationObserverTest {
@Before
public void before() {
- observer = new TestCommunicationObserver("test");
+ observer = new TestCommunicationObserver("test", 10);
}
@After
@@ -80,10 +80,10 @@ public class CommunicationObserverTest {
@Test
public void shouldNotKeepManyObservablePartnerMBeans() throws Exception {
- for (int i = 0; i < CommunicationObserver.MAX_CLIENT_STATISTICS * 2; i++) {
+ for (int i = 0; i < 20; i++) {
observer.gotMessageFrom(randomUUID().toString(), "request", "127.0.0.1", 8080);
}
- assertEquals(CommunicationObserver.MAX_CLIENT_STATISTICS, observer.communicationPartners.size());
+ assertEquals(10, observer.communicationPartners.size());
}
@Test