You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/05/08 11:31:43 UTC

[plc4x] 01/02: Implemeted JMX for the triggered scraper.

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/jmx-for-scraper-integration
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 0ff0ba28516d2b41d50d6976428698417fd1f6bd
Author: julian <j....@pragmaticminds.de>
AuthorDate: Wed May 8 11:07:03 2019 +0200

    Implemeted JMX for the triggered scraper.
---
 .../org/apache/plc4x/java/scraper/Scraper.java     |  2 +-
 .../triggeredscraper/TriggeredScraperImpl.java     | 59 +++++++++++++++++-----
 .../TriggeredScraperImplMBean.java}                | 22 +++-----
 .../triggeredscraper/TriggeredScraperTask.java     | 48 +++++++++++++-----
 .../TriggeredScraperTaskMBean.java}                | 26 ++++------
 5 files changed, 100 insertions(+), 57 deletions(-)

diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
index cd9ab7f..0fd7a2a 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
@@ -22,7 +22,7 @@ package org.apache.plc4x.java.scraper;
 /**
  * Main interface that orchestrates scraping.
  */
-public interface Scraper {
+public interface Scraper{
     /**
      * Start the scraping.
      */
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
index 40ad5d5..46cf44f 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
@@ -37,6 +37,8 @@ import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.*;
+import java.lang.management.ManagementFactory;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -54,9 +56,11 @@ import java.util.concurrent.*;
  *     right now boolean variables as well as numeric variables could be used as data-types
  *     available comparators are ==,!= for all data-types and &gt;,&gt;=,&lt;,&lt;= for numeric data-types
  */
-public class TriggeredScraperImpl implements Scraper {
+public class TriggeredScraperImpl implements Scraper, TriggeredScraperImplMBean {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(TriggeredScraperImpl.class);
+    public static final String MX_DOMAIN = "org.apache.plc4x.java";
+
 
     private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10,
         new BasicThreadFactory.Builder()
@@ -77,6 +81,7 @@ public class TriggeredScraperImpl implements Scraper {
     private final MultiValuedMap<ScraperTask, ScheduledFuture<?>> futures = new ArrayListValuedHashMap<>();
     private final PlcDriverManager driverManager;
     private final List<ScrapeJob> jobs;
+    private MBeanServer mBeanServer;
 
     /**
      * Creates a Scraper instance from a configuration.
@@ -88,6 +93,21 @@ public class TriggeredScraperImpl implements Scraper {
         this(resultHandler, createPooledDriverManager(), config.getJobs());
     }
 
+
+    public TriggeredScraperImpl(ResultHandler resultHandler, PlcDriverManager driverManager, List<ScrapeJob> jobs) {
+        this.resultHandler = resultHandler;
+        Validate.notEmpty(jobs);
+        this.driverManager = driverManager;
+        this.jobs = jobs;
+        // Register MBean
+        mBeanServer = ManagementFactory.getPlatformMBeanServer();
+        try {
+            mBeanServer.registerMBean(this, new ObjectName(MX_DOMAIN, "scraper", "scraper"));
+        } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
+            LOGGER.debug("Unable to register Scraper as MBean", e);
+        }
+    }
+
     /**
      * Min Idle per Key is set to 1 for situations where the network is broken.
      * Then, on reconnect we can fail all getConnection calls (in the ScraperTask) fast until
@@ -103,14 +123,6 @@ public class TriggeredScraperImpl implements Scraper {
         });
     }
 
-
-    public TriggeredScraperImpl(ResultHandler resultHandler, PlcDriverManager driverManager, List<ScrapeJob> jobs) {
-        this.resultHandler = resultHandler;
-        Validate.notEmpty(jobs);
-        this.driverManager = driverManager;
-        this.jobs = jobs;
-    }
-
     /**
      * Start the scraping.
      */
@@ -139,6 +151,8 @@ public class TriggeredScraperImpl implements Scraper {
                             executorService,
                             resultHandler,
                             (TriggeredScrapeJobImpl) tuple.getLeft());
+                        // Register task mxbean
+                        registerTaskMBean(task);
                         // Add task to internal list
                         tasks.put(tuple.getLeft(), task);
                         ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(task,
@@ -168,11 +182,19 @@ public class TriggeredScraperImpl implements Scraper {
         }, 1_000, 1_000, TimeUnit.MILLISECONDS);
     }
 
-    @Override
-    public int getNumberOfActiveTasks() {
-        return 0;
+    /**
+     * Register a task as MBean
+     * @param task task to register
+     */
+    private void registerTaskMBean(ScraperTask task) {
+        try {
+            mBeanServer.registerMBean(task, new ObjectName(MX_DOMAIN + ":type=ScrapeTask,name=" + task.getJobName() + "-" + task.getConnectionAlias()));
+        } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
+            LOGGER.debug("Unable to register Task as MBean", e);
+        }
     }
 
+
     @Override
     public void stop() {
         // Stop all futures
@@ -184,4 +206,17 @@ public class TriggeredScraperImpl implements Scraper {
         // Clear the map
         futures.clear();
     }
+
+
+    // MBean methods
+    @Override
+    public boolean isRunning() {
+        // TODO is this okay so?
+        return !futures.isEmpty();
+    }
+
+    @Override
+    public int getNumberOfActiveTasks() {
+        return (int) futures.entries().stream().filter(entry -> !entry.getValue().isDone()).count();
+    }
 }
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplMBean.java
similarity index 71%
copy from plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
copy to plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplMBean.java
index cd9ab7f..f9bb5e4 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplMBean.java
@@ -17,25 +17,17 @@
  * under the License.
  */
 
-package org.apache.plc4x.java.scraper;
+package org.apache.plc4x.java.scraper.triggeredscraper;
+
+import org.apache.plc4x.java.scraper.Scraper;
 
 /**
- * Main interface that orchestrates scraping.
+ * MBean for {@link Scraper}
  */
-public interface Scraper {
-    /**
-     * Start the scraping.
-     */
-    void start();
+public interface TriggeredScraperImplMBean {
+
+    boolean isRunning();
 
-    /**
-     * retrieves active tasks used for scraping
-     * @return number of active tasks
-     */
     int getNumberOfActiveTasks();
 
-    /**
-     * stops active scraping processes
-     */
-    void stop();
 }
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
index 56549a5..1032c26 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
@@ -48,7 +48,7 @@ import java.util.stream.Collectors;
  * performs the triggered task from a job for one device based on the TriggerHandler as defined in Configuration
  * ToDo Implement the monitoring as well: PLC4X-90
  */
-public class TriggeredScraperTask implements ScraperTask {
+public class TriggeredScraperTask implements ScraperTask, TriggeredScraperTaskMBean {
     private static final Logger LOGGER = LoggerFactory.getLogger(TriggeredScraperTask.class);
 
     private final PlcDriverManager driverManager;
@@ -169,42 +169,37 @@ public class TriggeredScraperTask implements ScraperTask {
 
     @Override
     public String getJobName() {
-        return null;
+        return this.jobName;
     }
 
     @Override
     public String getConnectionAlias() {
-        return null;
+        return this.connectionAlias;
     }
 
     @Override
     public long getRequestCounter() {
-        return 0;
+        return this.requestCounter.get();
     }
 
     @Override
     public long getSuccessfullRequestCounter() {
-        return 0;
+        return this.successCounter.get();
     }
 
     @Override
     public DescriptiveStatistics getLatencyStatistics() {
-        return null;
-    }
-
-    @Override
-    public double getPercentageFailed() {
-        return 0;
+        return this.latencyStatistics;
     }
 
     @Override
     public void handleException(Exception e) {
-
+        // TODO Implement this
     }
 
     @Override
     public void handleErrorResponse(Map<String, PlcResponseCode> failed) {
-
+        // TODO Implement this
     }
 
     public PlcDriverManager getDriverManager() {
@@ -222,4 +217,31 @@ public class TriggeredScraperTask implements ScraperTask {
     public long getRequestTimeoutMs() {
         return requestTimeoutMs;
     }
+
+    //---------------------------------
+    // JMX Monitoring
+    //---------------------------------
+    @Override
+    public long getScrapesTotal() {
+        return requestCounter.get();
+    }
+
+    @Override
+    public long getScrapesSuccess() {
+        return successCounter.get();
+    }
+
+    @Override
+    public double getPercentageFailed() {
+        return (double)this.getScrapesSuccess()/this.getScrapesTotal() * 100.0;
+    }
+
+    @Override
+    public String[] getPercentiles() {
+        String[] percentiles = new String[10];
+        for (int i = 1; i <= 10; i += 1) {
+            percentiles[i - 1] = String.format("%d%%: %s ms", 10 * i, latencyStatistics.getPercentile(10.0 * i) * 1e-6);
+        }
+        return percentiles;
+    }
 }
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTaskMBean.java
similarity index 68%
copy from plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
copy to plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTaskMBean.java
index cd9ab7f..33c1e98 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTaskMBean.java
@@ -17,25 +17,19 @@
  * under the License.
  */
 
-package org.apache.plc4x.java.scraper;
+package org.apache.plc4x.java.scraper.triggeredscraper;
 
 /**
- * Main interface that orchestrates scraping.
+ * MBean for a scrape job.
  */
-public interface Scraper {
-    /**
-     * Start the scraping.
-     */
-    void start();
+public interface TriggeredScraperTaskMBean {
 
-    /**
-     * retrieves active tasks used for scraping
-     * @return number of active tasks
-     */
-    int getNumberOfActiveTasks();
+    long getScrapesTotal();
+
+    long getScrapesSuccess();
+
+    double getPercentageFailed();
+
+    String[] getPercentiles();
 
-    /**
-     * stops active scraping processes
-     */
-    void stop();
 }