You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by go...@apache.org on 2019/02/09 01:32:30 UTC
[geode] branch develop updated: GEODE-6355: Fix Synchronization of
Function stats in IDS. (#3158)
This is an automated email from the ASF dual-hosted git repository.
gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 127cf7b GEODE-6355: Fix Synchronization of Function stats in IDS. (#3158)
127cf7b is described below
commit 127cf7b1d2e8a27103ff99671e80949097863aef
Author: Galen O'Sullivan <go...@pivotal.io>
AuthorDate: Fri Feb 8 17:32:11 2019 -0800
GEODE-6355: Fix Synchronization of Function stats in IDS. (#3158)
* Don't use double-checked locking.
* Use computeIfAbsent instead of locking on the object.
Cleanup:
* Rename listeners to disconnectListeners.
* Remove dead code
* generify collections
* Change some loops to foreach
* Make a ThreadLocal use withInitial
* Make methods and fields private
---
.../internal/InternalDistributedSystem.java | 195 +++++----------------
.../internal/cache/execute/FunctionStats.java | 35 +---
2 files changed, 47 insertions(+), 183 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index d1fe386..1f6f45a 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -135,7 +135,7 @@ public class InternalDistributedSystem extends DistributedSystem
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "Cache.ALLOW_MEMORY_OVERCOMMIT");
private static final Logger logger = LogService.getLogger();
- public static final String DISABLE_MANAGEMENT_PROPERTY =
+ private static final String DISABLE_MANAGEMENT_PROPERTY =
DistributionConfig.GEMFIRE_PREFIX + "disableManagement";
/**
@@ -159,7 +159,7 @@ public class InternalDistributedSystem extends DistributedSystem
// the following is overridden from DistributedTestCase to fix #51058
public static final AtomicReference<CreationStackGenerator> TEST_CREATION_STACK_GENERATOR =
- new AtomicReference<CreationStackGenerator>(DEFAULT_CREATION_STACK_GENERATOR);
+ new AtomicReference<>(DEFAULT_CREATION_STACK_GENERATOR);
/**
* A value of Boolean.TRUE will identify a thread being used to execute
@@ -280,7 +280,7 @@ public class InternalDistributedSystem extends DistributedSystem
/**
* Guards access to {@link #isConnected}
*/
- protected final Object isConnectedMutex = new Object();
+ private final Object isConnectedMutex = new Object();
/**
* Is this <code>DistributedSystem</code> connected to a distributed system?
@@ -305,12 +305,14 @@ public class InternalDistributedSystem extends DistributedSystem
* A set of listeners that are invoked when this connection to the distributed system is
* disconnected
*/
- private final Set listeners = new LinkedHashSet(); // needs to be ordered
+ private final Set<DisconnectListener> disconnectListeners = new LinkedHashSet<>(); // needs to be
+ // ordered
/**
* Set of listeners that are invoked whenever a connection is created to the distributed system
*/
- private static final Set connectListeners = new LinkedHashSet(); // needs to be ordered
+ // needs to be ordered
+ private static final Set<ConnectListener> connectListeners = new LinkedHashSet();
/**
* auto-reconnect listeners
@@ -546,14 +548,6 @@ public class InternalDistributedSystem extends DistributedSystem
return this.logWriter;
}
- public static InternalLogWriter getStaticSecurityInternalLogWriter() {
- InternalDistributedSystem sys = getAnyInstance();
- if (sys != null) {
- return sys.securityLogWriter;
- }
- return null;
- }
-
public InternalLogWriter getSecurityInternalLogWriter() {
InternalDistributedSystem sys = getAnyInstance();
if (sys != null) {
@@ -595,7 +589,7 @@ public class InternalDistributedSystem extends DistributedSystem
Object o = nonDefault.remove(DistributionConfig.DS_RECONNECTING_NAME);
if (o instanceof Boolean) {
- this.isReconnectingDS = ((Boolean) o).booleanValue();
+ this.isReconnectingDS = (Boolean) o;
} else {
this.isReconnectingDS = false;
}
@@ -1367,24 +1361,22 @@ public class InternalDistributedSystem extends DistributedSystem
* Disconnect cache, run disconnect listeners.
*
* @param doReconnect whether a reconnect will be done
- * @param reason the reason that the system is disconnecting
- *
* @return a collection of shutdownListeners
*/
- private HashSet doDisconnects(boolean doReconnect, String reason) {
+ private HashSet<ShutdownListener> doDisconnects(boolean doReconnect) {
// Make a pass over the disconnect listeners, asking them _politely_
// to clean up.
- HashSet shutdownListeners = new HashSet();
+ HashSet<ShutdownListener> shutdownListeners = new HashSet<>();
for (;;) {
- DisconnectListener listener = null;
- synchronized (this.listeners) {
- Iterator itr = listeners.iterator();
+ DisconnectListener listener;
+ synchronized (this.disconnectListeners) {
+ Iterator<DisconnectListener> itr = disconnectListeners.iterator();
if (!itr.hasNext()) {
- break;
+ return shutdownListeners;
}
- listener = (DisconnectListener) itr.next();
+ listener = itr.next();
if (listener instanceof ShutdownListener) {
- shutdownListeners.add(listener);
+ shutdownListeners.add((ShutdownListener) listener);
}
itr.remove();
} // synchronized
@@ -1395,7 +1387,6 @@ public class InternalDistributedSystem extends DistributedSystem
runDisconnect(listener);
}
} // for
- return shutdownListeners;
}
/**
@@ -1404,17 +1395,15 @@ public class InternalDistributedSystem extends DistributedSystem
*
* @param shutdownListeners shutdown listeners initially registered with us
*/
- private void doShutdownListeners(HashSet shutdownListeners) {
+ private void doShutdownListeners(HashSet<ShutdownListener> shutdownListeners) {
if (shutdownListeners == null) {
return;
}
// Process any shutdown listeners we reaped during first pass
- Iterator it = shutdownListeners.iterator();
- while (it.hasNext()) {
- ShutdownListener s = (ShutdownListener) it.next();
+ for (ShutdownListener shutdownListener : shutdownListeners) {
try {
- s.onShutdown(this);
+ shutdownListener.onShutdown(this);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
@@ -1428,7 +1417,7 @@ public class InternalDistributedSystem extends DistributedSystem
// is still usable:
SystemFailure.checkFailure();
// things could break since we continue, but we want to disconnect!
- logger.fatal(String.format("ShutdownListener < %s > threw...", s), t);
+ logger.fatal(String.format("ShutdownListener < %s > threw...", shutdownListener), t);
}
}
@@ -1440,12 +1429,12 @@ public class InternalDistributedSystem extends DistributedSystem
// Pluck next listener from the list
DisconnectListener dcListener = null;
ShutdownListener sdListener = null;
- synchronized (this.listeners) {
- Iterator itr = listeners.iterator();
+ synchronized (this.disconnectListeners) {
+ Iterator<DisconnectListener> itr = disconnectListeners.iterator();
if (!itr.hasNext()) {
break;
}
- dcListener = (DisconnectListener) itr.next();
+ dcListener = itr.next();
itr.remove();
if (dcListener instanceof ShutdownListener) {
sdListener = (ShutdownListener) dcListener;
@@ -1522,7 +1511,7 @@ public class InternalDistributedSystem extends DistributedSystem
dm.setRootCause(SystemFailure.getFailure());
}
this.isDisconnecting = true;
- this.listeners.clear();
+ this.disconnectListeners.clear();
if (DEBUG) {
System.err.println("DEBUG: done with InternalDistributedSystem#emergencyClose");
}
@@ -1574,7 +1563,7 @@ public class InternalDistributedSystem extends DistributedSystem
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
- HashSet shutdownListeners = null;
+ HashSet<ShutdownListener> shutdownListeners = null;
try {
if (isDebugEnabled) {
logger.debug("DistributedSystem.disconnect invoked on {}", this);
@@ -1633,7 +1622,7 @@ public class InternalDistributedSystem extends DistributedSystem
} // synchronized (GemFireCache.class)
if (!isShutdownHook) {
- shutdownListeners = doDisconnects(attemptingToReconnect, reason);
+ shutdownListeners = doDisconnects(attemptingToReconnect);
}
if (!this.attemptingToReconnect) {
@@ -2003,38 +1992,22 @@ public class InternalDistributedSystem extends DistributedSystem
// As the function execution stats can be lot in number, its better to put
// them in a map so that it will be accessible immediately
private final ConcurrentHashMap<String, FunctionStats> functionExecutionStatsMap =
- new ConcurrentHashMap<String, FunctionStats>();
+ new ConcurrentHashMap<>();
private FunctionServiceStats functionServiceStats = null;
public FunctionStats getFunctionStats(String textId) {
- FunctionStats stats = functionExecutionStatsMap.get(textId);
- if (stats == null) {
- stats = new FunctionStats(this, textId);
- FunctionStats oldStats = functionExecutionStatsMap.putIfAbsent(textId, stats);
- if (oldStats != null) {
- stats.close();
- stats = oldStats;
- }
- }
- return stats;
+ return functionExecutionStatsMap.computeIfAbsent(textId,
+ key -> new FunctionStats(this, key));
}
- public FunctionServiceStats getFunctionServiceStats() {
+ public synchronized FunctionServiceStats getFunctionServiceStats() {
if (functionServiceStats == null) {
- synchronized (this) {
- if (functionServiceStats == null) {
- functionServiceStats = new FunctionServiceStats(this, "FunctionExecution");
- }
- }
+ functionServiceStats = new FunctionServiceStats(this, "FunctionExecution");
}
return functionServiceStats;
}
- public Set<String> getAllFunctionExecutionIds() {
- return functionExecutionStatsMap.keySet();
- }
-
/**
* Makes note of a <code>ConnectListener</code> whose <code>onConnect</code> method will be
@@ -2059,8 +2032,6 @@ public class InternalDistributedSystem extends DistributedSystem
* The ReconnectListener set is cleared after a disconnect.
*/
public static void addReconnectListener(ReconnectListener listener) {
- // (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("registering reconnect
- // listener: " + listener);
synchronized (existingSystemsLock) {
synchronized (reconnectListeners) {
reconnectListeners.add(listener);
@@ -2069,26 +2040,13 @@ public class InternalDistributedSystem extends DistributedSystem
}
/**
- * Removes a <code>ConnectListener</code> from the list of listeners that will be notified when a
- * connection is created to a distributed system.
- *
- * @return true if listener was in the list
- */
- public static boolean removeConnectListener(ConnectListener listener) {
- synchronized (connectListeners) {
- return connectListeners.remove(listener);
- }
- }
-
- /**
* Notifies all registered <code>ConnectListener</code>s that a connection to a distributed system
* has been created.
*/
private static void notifyConnectListeners(InternalDistributedSystem sys) {
synchronized (connectListeners) {
- for (Iterator iter = connectListeners.iterator(); iter.hasNext();) {
+ for (ConnectListener listener : connectListeners) {
try {
- ConnectListener listener = (ConnectListener) iter.next();
listener.onConnect(sys);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
@@ -2110,16 +2068,6 @@ public class InternalDistributedSystem extends DistributedSystem
}
/**
- * Removes a <code>ReconnectListener</code> from the list of listeners that will be notified when
- * a connection is recreated to a distributed system.
- */
- public static void removeReconnectListener(ReconnectListener listener) {
- synchronized (reconnectListeners) {
- reconnectListeners.remove(listener);
- }
- }
-
- /**
* Notifies all registered <code>ReconnectListener</code>s that a connection to a distributed
* system has been recreated.
*/
@@ -2127,7 +2075,7 @@ public class InternalDistributedSystem extends DistributedSystem
InternalDistributedSystem newsys, boolean starting) {
List<ReconnectListener> listeners;
synchronized (reconnectListeners) {
- listeners = new ArrayList<ReconnectListener>(reconnectListeners);
+ listeners = new ArrayList<>(reconnectListeners);
}
for (ReconnectListener listener : listeners) {
try {
@@ -2193,8 +2141,8 @@ public class InternalDistributedSystem extends DistributedSystem
* invoked when this connection to the distributed system is disconnected.
*/
public void addDisconnectListener(DisconnectListener listener) {
- synchronized (this.listeners) {
- this.listeners.add(listener);
+ synchronized (this.disconnectListeners) {
+ this.disconnectListeners.add(listener);
boolean disconnectThreadBoolean = isDisconnectThread.get();
@@ -2205,7 +2153,7 @@ public class InternalDistributedSystem extends DistributedSystem
// other shutdown conditions will presumably get flagged.
String reason = this.stopper.cancelInProgress();
if (reason != null) {
- this.listeners.remove(listener); // don't leave in the list!
+ this.disconnectListeners.remove(listener); // don't leave in the list!
throw new DistributedSystemDisconnectedException(
String.format("No listeners permitted after shutdown: %s",
reason),
@@ -2218,12 +2166,10 @@ public class InternalDistributedSystem extends DistributedSystem
/**
* Removes a <code>DisconnectListener</code> from the list of listeners that will be notified when
* this connection to the distributed system is disconnected.
- *
- * @return true if listener was in the list
*/
- public boolean removeDisconnectListener(DisconnectListener listener) {
- synchronized (this.listeners) {
- return this.listeners.remove(listener);
+ public void removeDisconnectListener(DisconnectListener listener) {
+ synchronized (this.disconnectListeners) {
+ this.disconnectListeners.remove(listener);
}
}
@@ -2258,16 +2204,6 @@ public class InternalDistributedSystem extends DistributedSystem
}
/**
- * Fires an "informational" <code>SystemMembershipEvent</code> in admin VMs.
- *
- * @since GemFire 4.0
- */
- public void fireInfoEvent(Object callback) {
- throw new UnsupportedOperationException(
- "Not implemented yet");
- }
-
- /**
* Installs a shutdown hook to ensure that we are disconnected if an application VM shuts down
* without first calling disconnect itself.
*/
@@ -2281,11 +2217,11 @@ public class InternalDistributedSystem extends DistributedSystem
// Added for bug 38407
if (!Boolean.getBoolean(DISABLE_SHUTDOWN_HOOK_PROPERTY)) {
tmp_shutdownHook = new LoggingThread(SHUTDOWN_HOOK_NAME, false, () -> {
- DistributedSystem ds = InternalDistributedSystem.getAnyInstance();
+ InternalDistributedSystem ds = InternalDistributedSystem.getAnyInstance();
setThreadsSocketPolicy(true /* conserve sockets */);
if (ds != null && ds.isConnected()) {
logger.info("VM is exiting - shutting down distributed system");
- DurableClientAttributes dca = ((InternalDistributedSystem) ds).getDistributedMember()
+ DurableClientAttributes dca = ds.getDistributedMember()
.getDurableClientAttributes();
boolean isDurableClient = false;
@@ -2293,7 +2229,7 @@ public class InternalDistributedSystem extends DistributedSystem
isDurableClient = (!(dca.getId() == null || dca.getId().isEmpty()));
}
- ((InternalDistributedSystem) ds).disconnect(false,
+ ds.disconnect(false,
"normal disconnect",
isDurableClient/* keep alive drive from this */);
// this was how we wanted to do it for 5.7, but there were shutdown
@@ -2368,11 +2304,6 @@ public class InternalDistributedSystem extends DistributedSystem
private static volatile int reconnectAttemptCounter = 0;
/**
- * The time at which reconnect attempts last began
- */
- private static long reconnectAttemptTime;
-
- /**
* Boolean indicating if DS needs to reconnect and reconnect is in progress.
*/
private volatile boolean attemptingToReconnect = false;
@@ -2462,9 +2393,7 @@ public class InternalDistributedSystem extends DistributedSystem
* Make sure this instance of DS never does a reconnect. Also if reconnect is in progress cancel
* it.
*/
- public void cancelReconnect() {
- // (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("cancelReconnect invoked",
- // new Exception("stack trace");
+ private void cancelReconnect() {
this.reconnectCancelled = true;
if (isReconnecting()) {
synchronized (this.reconnectLock) { // should the synchronized be first on this and
@@ -2632,9 +2561,6 @@ public class InternalDistributedSystem extends DistributedSystem
}
}
- if (reconnectAttemptCounter == 0) {
- reconnectAttemptTime = System.currentTimeMillis();
- }
reconnectAttemptCounter++;
if (isReconnectCancelled()) {
@@ -2830,12 +2756,10 @@ public class InternalDistributedSystem extends DistributedSystem
reconnectDS.disconnect();
}
attemptingToReconnect = false;
- return;
} else if (reconnectDS != null && reconnectDS.isConnected()) {
logger.info("Reconnect completed.\nNew DistributedSystem is {}\nNew Cache is {}", reconnectDS,
cache);
}
-
}
@@ -2886,8 +2810,7 @@ public class InternalDistributedSystem extends DistributedSystem
DistributionConfig wanted = DistributionConfigImpl.produce(propsToCheck);
String[] validAttributeNames = this.originalConfig.getAttributeNames();
- for (int i = 0; i < validAttributeNames.length; i++) {
- String attName = validAttributeNames[i];
+ for (String attName : validAttributeNames) {
Object expectedAtt = wanted.getAttributeObject(attName);
String expectedAttStr = expectedAtt.toString();
Object actualAtt = this.originalConfig.getAttributeObject(attName);
@@ -2963,14 +2886,6 @@ public class InternalDistributedSystem extends DistributedSystem
void onConnect(InternalDistributedSystem sys);
}
- public String forceStop() {
- if (this.dm == null) {
- return "no distribution manager";
- }
- String reason = dm.getCancelCriterion().cancelInProgress();
- return reason;
- }
-
public boolean hasAlertListenerFor(DistributedMember member) {
return hasAlertListenerFor(member, AlertLevel.WARNING.intLevel());
}
@@ -2992,22 +2907,6 @@ public class InternalDistributedSystem extends DistributedSystem
DistributedSystem.setEnableAdministrationOnly(adminOnly);
}
- public boolean isServerLocator() {
- return this.startedLocator.isServerLocator();
- }
-
- /**
- * Provides synchronized time for this process based on other processes in this GemFire
- * distributed system. GemFire distributed system coordinator adjusts each member's time by an
- * offset. This offset for each member is calculated based on Berkeley Time Synchronization
- * algorithm.
- *
- * @return time in milliseconds.
- */
- public long systemTimeMillis() {
- return dm.cacheTimeMillis();
- }
-
@Override
public boolean waitUntilReconnected(long time, TimeUnit units) throws InterruptedException {
int sleepTime = 1000;
@@ -3018,8 +2917,6 @@ public class InternalDistributedSystem extends DistributedSystem
endTime += TimeUnit.MILLISECONDS.convert(time, units);
}
synchronized (this.reconnectLock) {
- InternalDistributedSystem recon = this.reconnectDS;
-
while (isReconnecting()) {
if (this.reconnectCancelled) {
break;
@@ -3032,7 +2929,7 @@ public class InternalDistributedSystem extends DistributedSystem
}
}
- recon = this.reconnectDS;
+ InternalDistributedSystem recon = this.reconnectDS;
return !attemptingToReconnect && recon != null && recon.isConnected();
}
}
@@ -3044,8 +2941,6 @@ public class InternalDistributedSystem extends DistributedSystem
@Override
public void stopReconnecting() {
- // (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("stopReconnecting invoked",
- // new Exception("stack trace");
this.reconnectCancelled = true;
synchronized (this.reconnectLock) {
this.reconnectLock.notify();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionStats.java
index 0c87d11..9196e2b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionStats.java
@@ -88,18 +88,6 @@ public class FunctionStats {
*/
private static final String FUNCTION_EXECUTION_EXCEPTIONS = "functionExecutionsExceptions";
- // /**
- // * Total number of bytes received before invoking the function
- // * Name of the functionExecution bytes received statistic
- // */
- // private static final String BYTES_RECEIVED = "bytesReceived";
- //
- // /**
- // * Total number of bytes serialized for the result of the function
- // * Name of the bytes serialized statistic
- // */
- // private static final String BYTES_SERIALIZED = "bytesSerialized";
-
/** Id of the FUNCTION_EXECUTIONS_COMPLETED statistic */
private static final int _functionExecutionsCompletedId;
@@ -127,13 +115,6 @@ public class FunctionStats {
/** Id of the FUNCTION_EXECUTIONS_EXCEPTIONS statistic */
private static final int _functionExecutionExceptions;
- // /** Id of the RESULTS_RECEIVED statistic */
- // private static final int _bytesReceived;
- //
- // /** Id of the FUNCTION_EXECUTIONS_EXCEPTIONS statistic */
- // private static final int _bytesSerialized;
-
-
/**
* Static initializer to create and initialize the <code>StatisticsType</code>
*/
@@ -173,18 +154,8 @@ public class FunctionStats {
f.createIntCounter(FUNCTION_EXECUTION_EXCEPTIONS,
"Total number of Exceptions Occurred while executing function", "operations"),
-
- // f
- // .createLongCounter(
- // BYTES_RECEIVED,
- // "Total number of bytes received before invoking the function",
- // "Bytes"),
- // f
- // .createLongCounter(
- // BYTES_SERIALIZED,
- // "Total number of bytes serialized for the result of the function",
- // "Bytes"),
});
+
// Initialize id fields
_functionExecutionsCompletedId = _type.nameToId(FUNCTION_EXECUTIONS_COMPLETED);
_functionExecutionsCompletedProcessingTimeId =
@@ -197,8 +168,6 @@ public class FunctionStats {
_functionExecutionsHasResultRunningId = _type.nameToId(FUNCTION_EXECUTIONS_HASRESULT_RUNNING);
_functionExecutionExceptions = _type.nameToId(FUNCTION_EXECUTION_EXCEPTIONS);
_resultsReceived = _type.nameToId(RESULTS_RECEIVED);
- // _bytesReceived = _type.nameToId(BYTES_RECEIVED);
- // _bytesSerialized = _type.nameToId(BYTES_SERIALIZED);
}
// //////////////////// Instance Fields //////////////////////
@@ -212,7 +181,7 @@ public class FunctionStats {
// ///////////////////// Constructors ///////////////////////
private FunctionStats() {
- this._stats = new DummyStatisticsImpl(this._type, null, 0);
+ this._stats = new DummyStatisticsImpl(_type, null, 0);
this.aggregateStats = FunctionServiceStats.createDummy();
}