You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/05/08 11:31:43 UTC
[plc4x] 01/02: Implemeted JMX for the triggered scraper.
This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch feature/jmx-for-scraper-integration
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 0ff0ba28516d2b41d50d6976428698417fd1f6bd
Author: julian <j....@pragmaticminds.de>
AuthorDate: Wed May 8 11:07:03 2019 +0200
Implemeted JMX for the triggered scraper.
---
.../org/apache/plc4x/java/scraper/Scraper.java | 2 +-
.../triggeredscraper/TriggeredScraperImpl.java | 59 +++++++++++++++++-----
.../TriggeredScraperImplMBean.java} | 22 +++-----
.../triggeredscraper/TriggeredScraperTask.java | 48 +++++++++++++-----
.../TriggeredScraperTaskMBean.java} | 26 ++++------
5 files changed, 100 insertions(+), 57 deletions(-)
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
index cd9ab7f..0fd7a2a 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
@@ -22,7 +22,7 @@ package org.apache.plc4x.java.scraper;
/**
* Main interface that orchestrates scraping.
*/
-public interface Scraper {
+public interface Scraper{
/**
* Start the scraping.
*/
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
index 40ad5d5..46cf44f 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
@@ -37,6 +37,8 @@ import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.management.*;
+import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -54,9 +56,11 @@ import java.util.concurrent.*;
* right now boolean variables as well as numeric variables could be used as data-types
* available comparators are ==,!= for all data-types and >,>=,<,<= for numeric data-types
*/
-public class TriggeredScraperImpl implements Scraper {
+public class TriggeredScraperImpl implements Scraper, TriggeredScraperImplMBean {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggeredScraperImpl.class);
+ public static final String MX_DOMAIN = "org.apache.plc4x.java";
+
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10,
new BasicThreadFactory.Builder()
@@ -77,6 +81,7 @@ public class TriggeredScraperImpl implements Scraper {
private final MultiValuedMap<ScraperTask, ScheduledFuture<?>> futures = new ArrayListValuedHashMap<>();
private final PlcDriverManager driverManager;
private final List<ScrapeJob> jobs;
+ private MBeanServer mBeanServer;
/**
* Creates a Scraper instance from a configuration.
@@ -88,6 +93,21 @@ public class TriggeredScraperImpl implements Scraper {
this(resultHandler, createPooledDriverManager(), config.getJobs());
}
+
+ public TriggeredScraperImpl(ResultHandler resultHandler, PlcDriverManager driverManager, List<ScrapeJob> jobs) {
+ this.resultHandler = resultHandler;
+ Validate.notEmpty(jobs);
+ this.driverManager = driverManager;
+ this.jobs = jobs;
+ // Register MBean
+ mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ mBeanServer.registerMBean(this, new ObjectName(MX_DOMAIN, "scraper", "scraper"));
+ } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
+ LOGGER.debug("Unable to register Scraper as MBean", e);
+ }
+ }
+
/**
* Min Idle per Key is set to 1 for situations where the network is broken.
* Then, on reconnect we can fail all getConnection calls (in the ScraperTask) fast until
@@ -103,14 +123,6 @@ public class TriggeredScraperImpl implements Scraper {
});
}
-
- public TriggeredScraperImpl(ResultHandler resultHandler, PlcDriverManager driverManager, List<ScrapeJob> jobs) {
- this.resultHandler = resultHandler;
- Validate.notEmpty(jobs);
- this.driverManager = driverManager;
- this.jobs = jobs;
- }
-
/**
* Start the scraping.
*/
@@ -139,6 +151,8 @@ public class TriggeredScraperImpl implements Scraper {
executorService,
resultHandler,
(TriggeredScrapeJobImpl) tuple.getLeft());
+ // Register task mxbean
+ registerTaskMBean(task);
// Add task to internal list
tasks.put(tuple.getLeft(), task);
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(task,
@@ -168,11 +182,19 @@ public class TriggeredScraperImpl implements Scraper {
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
}
- @Override
- public int getNumberOfActiveTasks() {
- return 0;
+ /**
+ * Register a task as MBean
+ * @param task task to register
+ */
+ private void registerTaskMBean(ScraperTask task) {
+ try {
+ mBeanServer.registerMBean(task, new ObjectName(MX_DOMAIN + ":type=ScrapeTask,name=" + task.getJobName() + "-" + task.getConnectionAlias()));
+ } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
+ LOGGER.debug("Unable to register Task as MBean", e);
+ }
}
+
@Override
public void stop() {
// Stop all futures
@@ -184,4 +206,17 @@ public class TriggeredScraperImpl implements Scraper {
// Clear the map
futures.clear();
}
+
+
+ // MBean methods
+ @Override
+ public boolean isRunning() {
+ // TODO is this okay so?
+ return !futures.isEmpty();
+ }
+
+ @Override
+ public int getNumberOfActiveTasks() {
+ return (int) futures.entries().stream().filter(entry -> !entry.getValue().isDone()).count();
+ }
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplMBean.java
similarity index 71%
copy from plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
copy to plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplMBean.java
index cd9ab7f..f9bb5e4 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplMBean.java
@@ -17,25 +17,17 @@
* under the License.
*/
-package org.apache.plc4x.java.scraper;
+package org.apache.plc4x.java.scraper.triggeredscraper;
+
+import org.apache.plc4x.java.scraper.Scraper;
/**
- * Main interface that orchestrates scraping.
+ * MBean for {@link Scraper}
*/
-public interface Scraper {
- /**
- * Start the scraping.
- */
- void start();
+public interface TriggeredScraperImplMBean {
+
+ boolean isRunning();
- /**
- * retrieves active tasks used for scraping
- * @return number of active tasks
- */
int getNumberOfActiveTasks();
- /**
- * stops active scraping processes
- */
- void stop();
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
index 56549a5..1032c26 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
@@ -48,7 +48,7 @@ import java.util.stream.Collectors;
* performs the triggered task from a job for one device based on the TriggerHandler as defined in Configuration
* ToDo Implement the monitoring as well: PLC4X-90
*/
-public class TriggeredScraperTask implements ScraperTask {
+public class TriggeredScraperTask implements ScraperTask, TriggeredScraperTaskMBean {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggeredScraperTask.class);
private final PlcDriverManager driverManager;
@@ -169,42 +169,37 @@ public class TriggeredScraperTask implements ScraperTask {
@Override
public String getJobName() {
- return null;
+ return this.jobName;
}
@Override
public String getConnectionAlias() {
- return null;
+ return this.connectionAlias;
}
@Override
public long getRequestCounter() {
- return 0;
+ return this.requestCounter.get();
}
@Override
public long getSuccessfullRequestCounter() {
- return 0;
+ return this.successCounter.get();
}
@Override
public DescriptiveStatistics getLatencyStatistics() {
- return null;
- }
-
- @Override
- public double getPercentageFailed() {
- return 0;
+ return this.latencyStatistics;
}
@Override
public void handleException(Exception e) {
-
+ // TODO Implement this
}
@Override
public void handleErrorResponse(Map<String, PlcResponseCode> failed) {
-
+ // TODO Implement this
}
public PlcDriverManager getDriverManager() {
@@ -222,4 +217,31 @@ public class TriggeredScraperTask implements ScraperTask {
public long getRequestTimeoutMs() {
return requestTimeoutMs;
}
+
+ //---------------------------------
+ // JMX Monitoring
+ //---------------------------------
+ @Override
+ public long getScrapesTotal() {
+ return requestCounter.get();
+ }
+
+ @Override
+ public long getScrapesSuccess() {
+ return successCounter.get();
+ }
+
+ @Override
+ public double getPercentageFailed() {
+ return (double)this.getScrapesSuccess()/this.getScrapesTotal() * 100.0;
+ }
+
+ @Override
+ public String[] getPercentiles() {
+ String[] percentiles = new String[10];
+ for (int i = 1; i <= 10; i += 1) {
+ percentiles[i - 1] = String.format("%d%%: %s ms", 10 * i, latencyStatistics.getPercentile(10.0 * i) * 1e-6);
+ }
+ return percentiles;
+ }
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTaskMBean.java
similarity index 68%
copy from plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
copy to plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTaskMBean.java
index cd9ab7f..33c1e98 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTaskMBean.java
@@ -17,25 +17,19 @@
* under the License.
*/
-package org.apache.plc4x.java.scraper;
+package org.apache.plc4x.java.scraper.triggeredscraper;
/**
- * Main interface that orchestrates scraping.
+ * MBean for a scrape job.
*/
-public interface Scraper {
- /**
- * Start the scraping.
- */
- void start();
+public interface TriggeredScraperTaskMBean {
- /**
- * retrieves active tasks used for scraping
- * @return number of active tasks
- */
- int getNumberOfActiveTasks();
+ long getScrapesTotal();
+
+ long getScrapesSuccess();
+
+ double getPercentageFailed();
+
+ String[] getPercentiles();
- /**
- * stops active scraping processes
- */
- void stop();
}