You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by tm...@apache.org on 2019/08/10 15:08:13 UTC

[plc4x] 04/05: changes due to api change of scraper

This is an automated email from the ASF dual-hosted git repository.

tmitsch pushed a commit to branch feature/improve-scraper-tim
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 2db314ad7dc700f2c19c1a64a969f6f5ea627fe4
Author: Tim Mitsch <t....@pragmaticindustries.de>
AuthorDate: Wed May 8 17:08:18 2019 +0200

    changes due to api change of scraper
---
 .../triggeredscraper/TriggeredScraperImpl.java     | 43 +++++++++++++++++++---
 ...erImplMBean.java => TriggeredScraperMBean.java} |  2 +-
 .../triggeredscraper/TriggeredScraperImplTest.java |  5 ++-
 3 files changed, 43 insertions(+), 7 deletions(-)

diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
index 4b76214..29cd7f4 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
@@ -23,7 +23,6 @@ import org.apache.commons.collections4.MultiValuedMap;
 import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
 import org.apache.commons.lang3.Validate;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.apache.commons.lang3.tuple.Triple;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
@@ -42,6 +41,8 @@ import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.*;
+import java.lang.management.ManagementFactory;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -60,9 +61,11 @@ import java.util.stream.Collectors;
  *     right now boolean variables as well as numeric variables could be used as data-types
  *     available comparators are ==,!= for all data-types and &gt;,&gt;=,&lt;,&lt;= for numeric data-types
  */
-public class TriggeredScraperImpl implements Scraper {
+public class TriggeredScraperImpl implements Scraper, TriggeredScraperMBean {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(TriggeredScraperImpl.class);
+    public static final String MX_DOMAIN = "org.apache.plc4x.java";
+
     private static final int DEFAULT_FUTURE_TIME_OUT = 2000;
 
     private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10,
@@ -84,6 +87,7 @@ public class TriggeredScraperImpl implements Scraper {
     private final MultiValuedMap<ScraperTask, ScheduledFuture<?>> scraperTaskMap = new ArrayListValuedHashMap<>();
     private final PlcDriverManager driverManager;
     private final List<ScrapeJob> jobs;
+    private MBeanServer mBeanServer;
 
     private long futureTimeOut;
 
@@ -124,8 +128,16 @@ public class TriggeredScraperImpl implements Scraper {
         this.jobs = jobs;
         this.triggerCollector = triggerCollector;
         this.futureTimeOut = futureTimeOut;
+        // Register MBean
+        mBeanServer = ManagementFactory.getPlatformMBeanServer();
+        try {
+            mBeanServer.registerMBean(this, new ObjectName(MX_DOMAIN, "scraper", "scraper"));
+        } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
+            LOGGER.debug("Unable to register Scraper as MBean", e);
+        }
     }
 
+
     /**
      * Min Idle per Key is set to 1 for situations where the network is broken.
      * Then, on reconnect we can fail all getConnection calls (in the ScraperTask) fast until
@@ -176,6 +188,7 @@ public class TriggeredScraperImpl implements Scraper {
 
                     // Add task to internal list
                     LOGGER.info("Task {} added to scheduling", triggeredScraperTask);
+                    registerTaskMBean(triggeredScraperTask);
                     tasks.put(job, triggeredScraperTask);
                     ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(triggeredScraperTask, 0, job.getScrapeRate(), TimeUnit.MILLISECONDS);
 
@@ -203,9 +216,16 @@ public class TriggeredScraperImpl implements Scraper {
         }, 1_000, 1_000, TimeUnit.MILLISECONDS);
     }
 
-    @Override
-    public int getNumberOfActiveTasks() {
-        return 0;
+    /**
+     * Register a task as MBean
+     * @param task task to register
+     */
+    private void registerTaskMBean(ScraperTask task) {
+        try {
+            mBeanServer.registerMBean(task, new ObjectName(MX_DOMAIN + ":type=ScrapeTask,name=" + task.getJobName() + "-" + task.getConnectionAlias()));
+        } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
+            LOGGER.debug("Unable to register Task as MBean", e);
+        }
     }
 
     @Override
@@ -296,4 +316,17 @@ public class TriggeredScraperImpl implements Scraper {
                 plcReadResponse::getObject
             ));
     }
+
+
+    // MBean methods
+    @Override
+    public boolean isRunning() {
+        // TODO is this okay so?
+        return !scraperTaskMap.isEmpty();
+    }
+
+    @Override
+    public int getNumberOfActiveTasks() {
+        return (int) scraperTaskMap.entries().stream().filter(entry -> !entry.getValue().isDone()).count();
+    }
 }
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplMBean.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperMBean.java
similarity index 95%
rename from plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplMBean.java
rename to plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperMBean.java
index f9bb5e4..b8e5232 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplMBean.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperMBean.java
@@ -24,7 +24,7 @@ import org.apache.plc4x.java.scraper.Scraper;
 /**
  * MBean for {@link Scraper}
  */
-public interface TriggeredScraperImplMBean {
+public interface TriggeredScraperMBean {
 
     boolean isRunning();
 
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
index 039b895..c47b938 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
@@ -29,6 +29,8 @@ import org.apache.plc4x.java.mock.MockDevice;
 import org.apache.plc4x.java.mock.PlcMockConnection;
 import org.apache.plc4x.java.scraper.config.triggeredscraper.TriggeredScraperConfiguration;
 import org.apache.plc4x.java.scraper.exception.ScraperException;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -105,7 +107,8 @@ public class TriggeredScraperImplTest {
         when(mockDevice2.read(eq("%DB810:DBW0:INT"))).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultLongFieldItem(4L)));
 
         TriggeredScraperConfiguration configuration = TriggeredScraperConfiguration.fromFile("src/test/resources/mock-scraper-config.yml");
-        TriggeredScraperImpl scraper = new TriggeredScraperImpl((j, a, m) -> System.out.println(String.format("Results from %s/%s: %s", j, a, m)), driverManager, configuration.getJobs());
+        TriggerCollector triggerCollector = new TriggerCollectorImpl(driverManager);
+        TriggeredScraperImpl scraper = new TriggeredScraperImpl((j, a, m) -> System.out.println(String.format("Results from %s/%s: %s", j, a, m)), driverManager, configuration.getJobs(),triggerCollector,1000);
 
         scraper.start();