You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by go...@apache.org on 2019/02/09 01:32:30 UTC

[geode] branch develop updated: GEODE-6355: Fix Synchronization of Function stats in IDS. (#3158)

This is an automated email from the ASF dual-hosted git repository.

gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 127cf7b  GEODE-6355: Fix Synchronization of Function stats in IDS. (#3158)
127cf7b is described below

commit 127cf7b1d2e8a27103ff99671e80949097863aef
Author: Galen O'Sullivan <go...@pivotal.io>
AuthorDate: Fri Feb 8 17:32:11 2019 -0800

    GEODE-6355: Fix Synchronization of Function stats in IDS. (#3158)
    
    * Don't use double-checked locking.
    * Use computeIfAbsent instead of locking on the object.
    
    Cleanup:
    * Rename listeners to disconnectListeners.
    * Remove dead code
    * generify collections
    * Change some loops to foreach
    * Make a ThreadLocal use withInitial
    * Make methods and fields private
---
 .../internal/InternalDistributedSystem.java        | 195 +++++----------------
 .../internal/cache/execute/FunctionStats.java      |  35 +---
 2 files changed, 47 insertions(+), 183 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index d1fe386..1f6f45a 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -135,7 +135,7 @@ public class InternalDistributedSystem extends DistributedSystem
       Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "Cache.ALLOW_MEMORY_OVERCOMMIT");
   private static final Logger logger = LogService.getLogger();
 
-  public static final String DISABLE_MANAGEMENT_PROPERTY =
+  private static final String DISABLE_MANAGEMENT_PROPERTY =
       DistributionConfig.GEMFIRE_PREFIX + "disableManagement";
 
   /**
@@ -159,7 +159,7 @@ public class InternalDistributedSystem extends DistributedSystem
 
   // the following is overridden from DistributedTestCase to fix #51058
   public static final AtomicReference<CreationStackGenerator> TEST_CREATION_STACK_GENERATOR =
-      new AtomicReference<CreationStackGenerator>(DEFAULT_CREATION_STACK_GENERATOR);
+      new AtomicReference<>(DEFAULT_CREATION_STACK_GENERATOR);
 
   /**
    * A value of Boolean.TRUE will identify a thread being used to execute
@@ -280,7 +280,7 @@ public class InternalDistributedSystem extends DistributedSystem
   /**
    * Guards access to {@link #isConnected}
    */
-  protected final Object isConnectedMutex = new Object();
+  private final Object isConnectedMutex = new Object();
 
   /**
    * Is this <code>DistributedSystem</code> connected to a distributed system?
@@ -305,12 +305,14 @@ public class InternalDistributedSystem extends DistributedSystem
    * A set of listeners that are invoked when this connection to the distributed system is
    * disconnected
    */
-  private final Set listeners = new LinkedHashSet(); // needs to be ordered
+  private final Set<DisconnectListener> disconnectListeners = new LinkedHashSet<>(); // needs to be
+                                                                                     // ordered
 
   /**
    * Set of listeners that are invoked whenever a connection is created to the distributed system
    */
-  private static final Set connectListeners = new LinkedHashSet(); // needs to be ordered
+  // needs to be ordered
+  private static final Set<ConnectListener> connectListeners = new LinkedHashSet();
 
   /**
    * auto-reconnect listeners
@@ -546,14 +548,6 @@ public class InternalDistributedSystem extends DistributedSystem
     return this.logWriter;
   }
 
-  public static InternalLogWriter getStaticSecurityInternalLogWriter() {
-    InternalDistributedSystem sys = getAnyInstance();
-    if (sys != null) {
-      return sys.securityLogWriter;
-    }
-    return null;
-  }
-
   public InternalLogWriter getSecurityInternalLogWriter() {
     InternalDistributedSystem sys = getAnyInstance();
     if (sys != null) {
@@ -595,7 +589,7 @@ public class InternalDistributedSystem extends DistributedSystem
 
     Object o = nonDefault.remove(DistributionConfig.DS_RECONNECTING_NAME);
     if (o instanceof Boolean) {
-      this.isReconnectingDS = ((Boolean) o).booleanValue();
+      this.isReconnectingDS = (Boolean) o;
     } else {
       this.isReconnectingDS = false;
     }
@@ -1367,24 +1361,22 @@ public class InternalDistributedSystem extends DistributedSystem
    * Disconnect cache, run disconnect listeners.
    *
    * @param doReconnect whether a reconnect will be done
-   * @param reason the reason that the system is disconnecting
-   *
    * @return a collection of shutdownListeners
    */
-  private HashSet doDisconnects(boolean doReconnect, String reason) {
+  private HashSet<ShutdownListener> doDisconnects(boolean doReconnect) {
     // Make a pass over the disconnect listeners, asking them _politely_
     // to clean up.
-    HashSet shutdownListeners = new HashSet();
+    HashSet<ShutdownListener> shutdownListeners = new HashSet<>();
     for (;;) {
-      DisconnectListener listener = null;
-      synchronized (this.listeners) {
-        Iterator itr = listeners.iterator();
+      DisconnectListener listener;
+      synchronized (this.disconnectListeners) {
+        Iterator<DisconnectListener> itr = disconnectListeners.iterator();
         if (!itr.hasNext()) {
-          break;
+          return shutdownListeners;
         }
-        listener = (DisconnectListener) itr.next();
+        listener = itr.next();
         if (listener instanceof ShutdownListener) {
-          shutdownListeners.add(listener);
+          shutdownListeners.add((ShutdownListener) listener);
         }
         itr.remove();
       } // synchronized
@@ -1395,7 +1387,6 @@ public class InternalDistributedSystem extends DistributedSystem
         runDisconnect(listener);
       }
     } // for
-    return shutdownListeners;
   }
 
   /**
@@ -1404,17 +1395,15 @@ public class InternalDistributedSystem extends DistributedSystem
    *
    * @param shutdownListeners shutdown listeners initially registered with us
    */
-  private void doShutdownListeners(HashSet shutdownListeners) {
+  private void doShutdownListeners(HashSet<ShutdownListener> shutdownListeners) {
     if (shutdownListeners == null) {
       return;
     }
 
     // Process any shutdown listeners we reaped during first pass
-    Iterator it = shutdownListeners.iterator();
-    while (it.hasNext()) {
-      ShutdownListener s = (ShutdownListener) it.next();
+    for (ShutdownListener shutdownListener : shutdownListeners) {
       try {
-        s.onShutdown(this);
+        shutdownListener.onShutdown(this);
       } catch (VirtualMachineError err) {
         SystemFailure.initiateFailure(err);
         // If this ever returns, rethrow the error. We're poisoned
@@ -1428,7 +1417,7 @@ public class InternalDistributedSystem extends DistributedSystem
         // is still usable:
         SystemFailure.checkFailure();
         // things could break since we continue, but we want to disconnect!
-        logger.fatal(String.format("ShutdownListener < %s > threw...", s), t);
+        logger.fatal(String.format("ShutdownListener < %s > threw...", shutdownListener), t);
       }
     }
 
@@ -1440,12 +1429,12 @@ public class InternalDistributedSystem extends DistributedSystem
       // Pluck next listener from the list
       DisconnectListener dcListener = null;
       ShutdownListener sdListener = null;
-      synchronized (this.listeners) {
-        Iterator itr = listeners.iterator();
+      synchronized (this.disconnectListeners) {
+        Iterator<DisconnectListener> itr = disconnectListeners.iterator();
         if (!itr.hasNext()) {
           break;
         }
-        dcListener = (DisconnectListener) itr.next();
+        dcListener = itr.next();
         itr.remove();
         if (dcListener instanceof ShutdownListener) {
           sdListener = (ShutdownListener) dcListener;
@@ -1522,7 +1511,7 @@ public class InternalDistributedSystem extends DistributedSystem
       dm.setRootCause(SystemFailure.getFailure());
     }
     this.isDisconnecting = true;
-    this.listeners.clear();
+    this.disconnectListeners.clear();
     if (DEBUG) {
       System.err.println("DEBUG: done with InternalDistributedSystem#emergencyClose");
     }
@@ -1574,7 +1563,7 @@ public class InternalDistributedSystem extends DistributedSystem
 
     final boolean isDebugEnabled = logger.isDebugEnabled();
     try {
-      HashSet shutdownListeners = null;
+      HashSet<ShutdownListener> shutdownListeners = null;
       try {
         if (isDebugEnabled) {
           logger.debug("DistributedSystem.disconnect invoked on {}", this);
@@ -1633,7 +1622,7 @@ public class InternalDistributedSystem extends DistributedSystem
         } // synchronized (GemFireCache.class)
 
         if (!isShutdownHook) {
-          shutdownListeners = doDisconnects(attemptingToReconnect, reason);
+          shutdownListeners = doDisconnects(attemptingToReconnect);
         }
 
         if (!this.attemptingToReconnect) {
@@ -2003,38 +1992,22 @@ public class InternalDistributedSystem extends DistributedSystem
   // As the function execution stats can be lot in number, its better to put
   // them in a map so that it will be accessible immediately
   private final ConcurrentHashMap<String, FunctionStats> functionExecutionStatsMap =
-      new ConcurrentHashMap<String, FunctionStats>();
+      new ConcurrentHashMap<>();
   private FunctionServiceStats functionServiceStats = null;
 
   public FunctionStats getFunctionStats(String textId) {
-    FunctionStats stats = functionExecutionStatsMap.get(textId);
-    if (stats == null) {
-      stats = new FunctionStats(this, textId);
-      FunctionStats oldStats = functionExecutionStatsMap.putIfAbsent(textId, stats);
-      if (oldStats != null) {
-        stats.close();
-        stats = oldStats;
-      }
-    }
-    return stats;
+    return functionExecutionStatsMap.computeIfAbsent(textId,
+        key -> new FunctionStats(this, key));
   }
 
 
-  public FunctionServiceStats getFunctionServiceStats() {
+  public synchronized FunctionServiceStats getFunctionServiceStats() {
     if (functionServiceStats == null) {
-      synchronized (this) {
-        if (functionServiceStats == null) {
-          functionServiceStats = new FunctionServiceStats(this, "FunctionExecution");
-        }
-      }
+      functionServiceStats = new FunctionServiceStats(this, "FunctionExecution");
     }
     return functionServiceStats;
   }
 
-  public Set<String> getAllFunctionExecutionIds() {
-    return functionExecutionStatsMap.keySet();
-  }
-
 
   /**
    * Makes note of a <code>ConnectListener</code> whose <code>onConnect</code> method will be
@@ -2059,8 +2032,6 @@ public class InternalDistributedSystem extends DistributedSystem
    * The ReconnectListener set is cleared after a disconnect.
    */
   public static void addReconnectListener(ReconnectListener listener) {
-    // (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("registering reconnect
-    // listener: " + listener);
     synchronized (existingSystemsLock) {
       synchronized (reconnectListeners) {
         reconnectListeners.add(listener);
@@ -2069,26 +2040,13 @@ public class InternalDistributedSystem extends DistributedSystem
   }
 
   /**
-   * Removes a <code>ConnectListener</code> from the list of listeners that will be notified when a
-   * connection is created to a distributed system.
-   *
-   * @return true if listener was in the list
-   */
-  public static boolean removeConnectListener(ConnectListener listener) {
-    synchronized (connectListeners) {
-      return connectListeners.remove(listener);
-    }
-  }
-
-  /**
    * Notifies all registered <code>ConnectListener</code>s that a connection to a distributed system
    * has been created.
    */
   private static void notifyConnectListeners(InternalDistributedSystem sys) {
     synchronized (connectListeners) {
-      for (Iterator iter = connectListeners.iterator(); iter.hasNext();) {
+      for (ConnectListener listener : connectListeners) {
         try {
-          ConnectListener listener = (ConnectListener) iter.next();
           listener.onConnect(sys);
         } catch (VirtualMachineError err) {
           SystemFailure.initiateFailure(err);
@@ -2110,16 +2068,6 @@ public class InternalDistributedSystem extends DistributedSystem
   }
 
   /**
-   * Removes a <code>ReconnectListener</code> from the list of listeners that will be notified when
-   * a connection is recreated to a distributed system.
-   */
-  public static void removeReconnectListener(ReconnectListener listener) {
-    synchronized (reconnectListeners) {
-      reconnectListeners.remove(listener);
-    }
-  }
-
-  /**
    * Notifies all registered <code>ReconnectListener</code>s that a connection to a distributed
    * system has been recreated.
    */
@@ -2127,7 +2075,7 @@ public class InternalDistributedSystem extends DistributedSystem
       InternalDistributedSystem newsys, boolean starting) {
     List<ReconnectListener> listeners;
     synchronized (reconnectListeners) {
-      listeners = new ArrayList<ReconnectListener>(reconnectListeners);
+      listeners = new ArrayList<>(reconnectListeners);
     }
     for (ReconnectListener listener : listeners) {
       try {
@@ -2193,8 +2141,8 @@ public class InternalDistributedSystem extends DistributedSystem
    * invoked when this connection to the distributed system is disconnected.
    */
   public void addDisconnectListener(DisconnectListener listener) {
-    synchronized (this.listeners) {
-      this.listeners.add(listener);
+    synchronized (this.disconnectListeners) {
+      this.disconnectListeners.add(listener);
 
       boolean disconnectThreadBoolean = isDisconnectThread.get();
 
@@ -2205,7 +2153,7 @@ public class InternalDistributedSystem extends DistributedSystem
         // other shutdown conditions will presumably get flagged.
         String reason = this.stopper.cancelInProgress();
         if (reason != null) {
-          this.listeners.remove(listener); // don't leave in the list!
+          this.disconnectListeners.remove(listener); // don't leave in the list!
           throw new DistributedSystemDisconnectedException(
               String.format("No listeners permitted after shutdown: %s",
                   reason),
@@ -2218,12 +2166,10 @@ public class InternalDistributedSystem extends DistributedSystem
   /**
    * Removes a <code>DisconnectListener</code> from the list of listeners that will be notified when
    * this connection to the distributed system is disconnected.
-   *
-   * @return true if listener was in the list
    */
-  public boolean removeDisconnectListener(DisconnectListener listener) {
-    synchronized (this.listeners) {
-      return this.listeners.remove(listener);
+  public void removeDisconnectListener(DisconnectListener listener) {
+    synchronized (this.disconnectListeners) {
+      this.disconnectListeners.remove(listener);
     }
   }
 
@@ -2258,16 +2204,6 @@ public class InternalDistributedSystem extends DistributedSystem
   }
 
   /**
-   * Fires an "informational" <code>SystemMembershipEvent</code> in admin VMs.
-   *
-   * @since GemFire 4.0
-   */
-  public void fireInfoEvent(Object callback) {
-    throw new UnsupportedOperationException(
-        "Not implemented yet");
-  }
-
-  /**
    * Installs a shutdown hook to ensure that we are disconnected if an application VM shuts down
    * without first calling disconnect itself.
    */
@@ -2281,11 +2217,11 @@ public class InternalDistributedSystem extends DistributedSystem
       // Added for bug 38407
       if (!Boolean.getBoolean(DISABLE_SHUTDOWN_HOOK_PROPERTY)) {
         tmp_shutdownHook = new LoggingThread(SHUTDOWN_HOOK_NAME, false, () -> {
-          DistributedSystem ds = InternalDistributedSystem.getAnyInstance();
+          InternalDistributedSystem ds = InternalDistributedSystem.getAnyInstance();
           setThreadsSocketPolicy(true /* conserve sockets */);
           if (ds != null && ds.isConnected()) {
             logger.info("VM is exiting - shutting down distributed system");
-            DurableClientAttributes dca = ((InternalDistributedSystem) ds).getDistributedMember()
+            DurableClientAttributes dca = ds.getDistributedMember()
                 .getDurableClientAttributes();
             boolean isDurableClient = false;
 
@@ -2293,7 +2229,7 @@ public class InternalDistributedSystem extends DistributedSystem
               isDurableClient = (!(dca.getId() == null || dca.getId().isEmpty()));
             }
 
-            ((InternalDistributedSystem) ds).disconnect(false,
+            ds.disconnect(false,
                 "normal disconnect",
                 isDurableClient/* keep alive drive from this */);
             // this was how we wanted to do it for 5.7, but there were shutdown
@@ -2368,11 +2304,6 @@ public class InternalDistributedSystem extends DistributedSystem
   private static volatile int reconnectAttemptCounter = 0;
 
   /**
-   * The time at which reconnect attempts last began
-   */
-  private static long reconnectAttemptTime;
-
-  /**
    * Boolean indicating if DS needs to reconnect and reconnect is in progress.
    */
   private volatile boolean attemptingToReconnect = false;
@@ -2462,9 +2393,7 @@ public class InternalDistributedSystem extends DistributedSystem
    * Make sure this instance of DS never does a reconnect. Also if reconnect is in progress cancel
    * it.
    */
-  public void cancelReconnect() {
-    // (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("cancelReconnect invoked",
-    // new Exception("stack trace");
+  private void cancelReconnect() {
     this.reconnectCancelled = true;
     if (isReconnecting()) {
       synchronized (this.reconnectLock) { // should the synchronized be first on this and
@@ -2632,9 +2561,6 @@ public class InternalDistributedSystem extends DistributedSystem
           }
         }
 
-        if (reconnectAttemptCounter == 0) {
-          reconnectAttemptTime = System.currentTimeMillis();
-        }
         reconnectAttemptCounter++;
 
         if (isReconnectCancelled()) {
@@ -2830,12 +2756,10 @@ public class InternalDistributedSystem extends DistributedSystem
         reconnectDS.disconnect();
       }
       attemptingToReconnect = false;
-      return;
     } else if (reconnectDS != null && reconnectDS.isConnected()) {
       logger.info("Reconnect completed.\nNew DistributedSystem is {}\nNew Cache is {}", reconnectDS,
           cache);
     }
-
   }
 
 
@@ -2886,8 +2810,7 @@ public class InternalDistributedSystem extends DistributedSystem
       DistributionConfig wanted = DistributionConfigImpl.produce(propsToCheck);
 
       String[] validAttributeNames = this.originalConfig.getAttributeNames();
-      for (int i = 0; i < validAttributeNames.length; i++) {
-        String attName = validAttributeNames[i];
+      for (String attName : validAttributeNames) {
         Object expectedAtt = wanted.getAttributeObject(attName);
         String expectedAttStr = expectedAtt.toString();
         Object actualAtt = this.originalConfig.getAttributeObject(attName);
@@ -2963,14 +2886,6 @@ public class InternalDistributedSystem extends DistributedSystem
     void onConnect(InternalDistributedSystem sys);
   }
 
-  public String forceStop() {
-    if (this.dm == null) {
-      return "no distribution manager";
-    }
-    String reason = dm.getCancelCriterion().cancelInProgress();
-    return reason;
-  }
-
   public boolean hasAlertListenerFor(DistributedMember member) {
     return hasAlertListenerFor(member, AlertLevel.WARNING.intLevel());
   }
@@ -2992,22 +2907,6 @@ public class InternalDistributedSystem extends DistributedSystem
     DistributedSystem.setEnableAdministrationOnly(adminOnly);
   }
 
-  public boolean isServerLocator() {
-    return this.startedLocator.isServerLocator();
-  }
-
-  /**
-   * Provides synchronized time for this process based on other processes in this GemFire
-   * distributed system. GemFire distributed system coordinator adjusts each member's time by an
-   * offset. This offset for each member is calculated based on Berkeley Time Synchronization
-   * algorithm.
-   *
-   * @return time in milliseconds.
-   */
-  public long systemTimeMillis() {
-    return dm.cacheTimeMillis();
-  }
-
   @Override
   public boolean waitUntilReconnected(long time, TimeUnit units) throws InterruptedException {
     int sleepTime = 1000;
@@ -3018,8 +2917,6 @@ public class InternalDistributedSystem extends DistributedSystem
       endTime += TimeUnit.MILLISECONDS.convert(time, units);
     }
     synchronized (this.reconnectLock) {
-      InternalDistributedSystem recon = this.reconnectDS;
-
       while (isReconnecting()) {
         if (this.reconnectCancelled) {
           break;
@@ -3032,7 +2929,7 @@ public class InternalDistributedSystem extends DistributedSystem
         }
       }
 
-      recon = this.reconnectDS;
+      InternalDistributedSystem recon = this.reconnectDS;
       return !attemptingToReconnect && recon != null && recon.isConnected();
     }
   }
@@ -3044,8 +2941,6 @@ public class InternalDistributedSystem extends DistributedSystem
 
   @Override
   public void stopReconnecting() {
-    // (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("stopReconnecting invoked",
-    // new Exception("stack trace");
     this.reconnectCancelled = true;
     synchronized (this.reconnectLock) {
       this.reconnectLock.notify();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionStats.java
index 0c87d11..9196e2b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionStats.java
@@ -88,18 +88,6 @@ public class FunctionStats {
    */
   private static final String FUNCTION_EXECUTION_EXCEPTIONS = "functionExecutionsExceptions";
 
-  // /**
-  // * Total number of bytes received before invoking the function
-  // * Name of the functionExecution bytes received statistic
-  // */
-  // private static final String BYTES_RECEIVED = "bytesReceived";
-  //
-  // /**
-  // * Total number of bytes serialized for the result of the function
-  // * Name of the bytes serialized statistic
-  // */
-  // private static final String BYTES_SERIALIZED = "bytesSerialized";
-
   /** Id of the FUNCTION_EXECUTIONS_COMPLETED statistic */
   private static final int _functionExecutionsCompletedId;
 
@@ -127,13 +115,6 @@ public class FunctionStats {
   /** Id of the FUNCTION_EXECUTIONS_EXCEPTIONS statistic */
   private static final int _functionExecutionExceptions;
 
-  // /** Id of the RESULTS_RECEIVED statistic */
-  // private static final int _bytesReceived;
-  //
-  // /** Id of the FUNCTION_EXECUTIONS_EXCEPTIONS statistic */
-  // private static final int _bytesSerialized;
-
-
   /**
    * Static initializer to create and initialize the <code>StatisticsType</code>
    */
@@ -173,18 +154,8 @@ public class FunctionStats {
 
             f.createIntCounter(FUNCTION_EXECUTION_EXCEPTIONS,
                 "Total number of Exceptions Occurred while executing function", "operations"),
-
-        // f
-        // .createLongCounter(
-        // BYTES_RECEIVED,
-        // "Total number of bytes received before invoking the function",
-        // "Bytes"),
-        // f
-        // .createLongCounter(
-        // BYTES_SERIALIZED,
-        // "Total number of bytes serialized for the result of the function",
-        // "Bytes"),
         });
+
     // Initialize id fields
     _functionExecutionsCompletedId = _type.nameToId(FUNCTION_EXECUTIONS_COMPLETED);
     _functionExecutionsCompletedProcessingTimeId =
@@ -197,8 +168,6 @@ public class FunctionStats {
     _functionExecutionsHasResultRunningId = _type.nameToId(FUNCTION_EXECUTIONS_HASRESULT_RUNNING);
     _functionExecutionExceptions = _type.nameToId(FUNCTION_EXECUTION_EXCEPTIONS);
     _resultsReceived = _type.nameToId(RESULTS_RECEIVED);
-    // _bytesReceived = _type.nameToId(BYTES_RECEIVED);
-    // _bytesSerialized = _type.nameToId(BYTES_SERIALIZED);
   }
 
   // //////////////////// Instance Fields //////////////////////
@@ -212,7 +181,7 @@ public class FunctionStats {
   // ///////////////////// Constructors ///////////////////////
 
   private FunctionStats() {
-    this._stats = new DummyStatisticsImpl(this._type, null, 0);
+    this._stats = new DummyStatisticsImpl(_type, null, 0);
     this.aggregateStats = FunctionServiceStats.createDummy();
   }