You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by tm...@apache.org on 2019/08/10 15:08:13 UTC
[plc4x] 04/05: changes due to api change of scraper
This is an automated email from the ASF dual-hosted git repository.
tmitsch pushed a commit to branch feature/improve-scraper-tim
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 2db314ad7dc700f2c19c1a64a969f6f5ea627fe4
Author: Tim Mitsch <t....@pragmaticindustries.de>
AuthorDate: Wed May 8 17:08:18 2019 +0200
changes due to api change of scraper
---
.../triggeredscraper/TriggeredScraperImpl.java | 43 +++++++++++++++++++---
...erImplMBean.java => TriggeredScraperMBean.java} | 2 +-
.../triggeredscraper/TriggeredScraperImplTest.java | 5 ++-
3 files changed, 43 insertions(+), 7 deletions(-)
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
index 4b76214..29cd7f4 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
@@ -23,7 +23,6 @@ import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.apache.commons.lang3.tuple.Triple;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
@@ -42,6 +41,8 @@ import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.management.*;
+import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -60,9 +61,11 @@ import java.util.stream.Collectors;
* right now boolean variables as well as numeric variables could be used as data-types
* available comparators are ==,!= for all data-types and >,>=,<,<= for numeric data-types
*/
-public class TriggeredScraperImpl implements Scraper {
+public class TriggeredScraperImpl implements Scraper, TriggeredScraperMBean {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggeredScraperImpl.class);
+ public static final String MX_DOMAIN = "org.apache.plc4x.java";
+
private static final int DEFAULT_FUTURE_TIME_OUT = 2000;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10,
@@ -84,6 +87,7 @@ public class TriggeredScraperImpl implements Scraper {
private final MultiValuedMap<ScraperTask, ScheduledFuture<?>> scraperTaskMap = new ArrayListValuedHashMap<>();
private final PlcDriverManager driverManager;
private final List<ScrapeJob> jobs;
+ private MBeanServer mBeanServer;
private long futureTimeOut;
@@ -124,8 +128,16 @@ public class TriggeredScraperImpl implements Scraper {
this.jobs = jobs;
this.triggerCollector = triggerCollector;
this.futureTimeOut = futureTimeOut;
+ // Register MBean
+ mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ mBeanServer.registerMBean(this, new ObjectName(MX_DOMAIN, "scraper", "scraper"));
+ } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
+ LOGGER.debug("Unable to register Scraper as MBean", e);
+ }
}
+
/**
* Min Idle per Key is set to 1 for situations where the network is broken.
* Then, on reconnect we can fail all getConnection calls (in the ScraperTask) fast until
@@ -176,6 +188,7 @@ public class TriggeredScraperImpl implements Scraper {
// Add task to internal list
LOGGER.info("Task {} added to scheduling", triggeredScraperTask);
+ registerTaskMBean(triggeredScraperTask);
tasks.put(job, triggeredScraperTask);
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(triggeredScraperTask, 0, job.getScrapeRate(), TimeUnit.MILLISECONDS);
@@ -203,9 +216,16 @@ public class TriggeredScraperImpl implements Scraper {
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
}
- @Override
- public int getNumberOfActiveTasks() {
- return 0;
+ /**
+ * Register a task as MBean
+ * @param task task to register
+ */
+ private void registerTaskMBean(ScraperTask task) {
+ try {
+ mBeanServer.registerMBean(task, new ObjectName(MX_DOMAIN + ":type=ScrapeTask,name=" + task.getJobName() + "-" + task.getConnectionAlias()));
+ } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
+ LOGGER.debug("Unable to register Task as MBean", e);
+ }
}
@Override
@@ -296,4 +316,17 @@ public class TriggeredScraperImpl implements Scraper {
plcReadResponse::getObject
));
}
+
+
+ // MBean methods
+ @Override
+ public boolean isRunning() {
+ // TODO is this okay so?
+ return !scraperTaskMap.isEmpty();
+ }
+
+ @Override
+ public int getNumberOfActiveTasks() {
+ return (int) scraperTaskMap.entries().stream().filter(entry -> !entry.getValue().isDone()).count();
+ }
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplMBean.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperMBean.java
similarity index 95%
rename from plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplMBean.java
rename to plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperMBean.java
index f9bb5e4..b8e5232 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplMBean.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperMBean.java
@@ -24,7 +24,7 @@ import org.apache.plc4x.java.scraper.Scraper;
/**
* MBean for {@link Scraper}
*/
-public interface TriggeredScraperImplMBean {
+public interface TriggeredScraperMBean {
boolean isRunning();
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
index 039b895..c47b938 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
@@ -29,6 +29,8 @@ import org.apache.plc4x.java.mock.MockDevice;
import org.apache.plc4x.java.mock.PlcMockConnection;
import org.apache.plc4x.java.scraper.config.triggeredscraper.TriggeredScraperConfiguration;
import org.apache.plc4x.java.scraper.exception.ScraperException;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -105,7 +107,8 @@ public class TriggeredScraperImplTest {
when(mockDevice2.read(eq("%DB810:DBW0:INT"))).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultLongFieldItem(4L)));
TriggeredScraperConfiguration configuration = TriggeredScraperConfiguration.fromFile("src/test/resources/mock-scraper-config.yml");
- TriggeredScraperImpl scraper = new TriggeredScraperImpl((j, a, m) -> System.out.println(String.format("Results from %s/%s: %s", j, a, m)), driverManager, configuration.getJobs());
+ TriggerCollector triggerCollector = new TriggerCollectorImpl(driverManager);
+ TriggeredScraperImpl scraper = new TriggeredScraperImpl((j, a, m) -> System.out.println(String.format("Results from %s/%s: %s", j, a, m)), driverManager, configuration.getJobs(),triggerCollector,1000);
scraper.start();