You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2020/05/08 20:36:12 UTC

[incubator-pinot] branch mock-event-generator created (now 7ee05d1)

This is an automated email from the ASF dual-hosted git repository.

apucher pushed a change to branch mock-event-generator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 7ee05d1  Mock event generator for demo purposes - pseudo-random distribution-based generator similar to mock data source - configuration via detector.yml - one-time execution at statup in backend with db-persistence and deduplication

This branch includes the following new commits:

     new 7ee05d1  Mock event generator for demo purposes - pseudo-random distribution-based generator similar to mock data source - configuration via detector.yml - one-time execution at statup in backend with db-persistence and deduplication

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Mock event generator for demo purposes - pseudo-random distribution-based generator similar to mock data source - configuration via detector.yml - one-time execution at statup in backend with db-persistence and deduplication

Posted by ap...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch mock-event-generator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 7ee05d198e248b1c0138aa8e9d7726ec55ef66b5
Author: Alexander Pucher <al...@alexpucher.com>
AuthorDate: Fri May 8 13:32:26 2020 -0700

    Mock event generator for demo purposes
    - pseudo-random distribution-based generator similar to mock data source
    - configuration via detector.yml
    - one-time execution at statup in backend with db-persistence and deduplication
---
 thirdeye/thirdeye-pinot/config/detector.yml        |  33 ++++++
 thirdeye/thirdeye-pinot/config/rca.yml             |  26 ++++-
 .../anomaly/MockEventsLoaderConfiguration.java     |  76 ++++++++++++
 .../anomaly/ThirdEyeAnomalyApplication.java        |   6 +
 .../anomaly/ThirdEyeAnomalyConfiguration.java      |  18 +++
 .../thirdeye/anomaly/events/MockEventsLoader.java  | 129 +++++++++++++++++++++
 .../thirdeye/datalayer/entity/EventIndex.java      |   8 +-
 .../datasource/mock/MockThirdEyeDataSource.java    |   4 +-
 .../rootcause/impl/ThirdEyeEventEntity.java        |   2 +-
 .../rootcause/impl/ThirdEyeEventsPipeline.java     |   7 +-
 10 files changed, 294 insertions(+), 15 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/config/detector.yml b/thirdeye/thirdeye-pinot/config/detector.yml
index 43cfe5a..8a9de0a 100644
--- a/thirdeye/thirdeye-pinot/config/detector.yml
+++ b/thirdeye/thirdeye-pinot/config/detector.yml
@@ -17,6 +17,7 @@ alert: false
 autoload: false
 classifier: false
 holidayEventsLoader: false
+mockEventsLoader: true
 monitor: false
 pinotProxy: false
 scheduler: false
@@ -82,3 +83,35 @@ holidayEventsLoaderConfiguration:
        - "en.vietnamese#holiday@group.v.calendar.google.com"
   holidayLoadRange: 2592000000
   runFrequency: 7
+mockEventsLoaderConfiguration:
+  generators:
+    - type: HOLIDAY
+      arrivalType: exponential
+      arrivalMean: 86400000
+      durationType: fixed
+      durationMean: 86400000
+      seed: 0
+    - type: INFORMED
+      arrivalType: exponential
+      arrivalMean: 43200000
+      durationType: exponential
+      durationMean: 3600000
+      seed: 1
+    - type: CM
+      arrivalType: exponential
+      arrivalMean: 21600000
+      durationType: fixed
+      durationMean: 1800000
+      seed: 2
+    - type: CUSTOM
+      arrivalType: exponential
+      arrivalMean: 432000000
+      durationType: exponential
+      durationMean: 86400000
+      seed: 3
+    - type: LIX
+      arrivalType: exponential
+      arrivalMean: 259200000
+      durationType: exponential
+      durationMean: 604800000
+      seed: 4
diff --git a/thirdeye/thirdeye-pinot/config/rca.yml b/thirdeye/thirdeye-pinot/config/rca.yml
index 0c3cb2d..aecfd76 100644
--- a/thirdeye/thirdeye-pinot/config/rca.yml
+++ b/thirdeye/thirdeye-pinot/config/rca.yml
@@ -31,7 +31,11 @@ frameworks:
     eventExperiment:
         - outputName: OUTPUT
           inputNames: [INPUT]
-          className: org.apache.pinot.thirdeye.rootcause.impl.EmptyPipeline
+          className: org.apache.pinot.thirdeye.rootcause.impl.ThirdEyeEventsPipeline
+          properties:
+            strategy: COMPOUND
+            k: 500
+            eventType: LIX
 
     eventHoliday:
         - outputName: METRIC_RELATED
@@ -44,7 +48,7 @@ frameworks:
           properties:
               strategy: COMPOUND
               k: 500
-              eventType: holiday
+              eventType: HOLIDAY
 
     eventCustom:
         - outputName: OUTPUT
@@ -53,7 +57,7 @@ frameworks:
           properties:
               strategy: COMPOUND
               k: 500
-              eventType: custom
+              eventType: CUSTOM
 
     eventAnomaly:
         - outputName: OUTPUT
@@ -73,10 +77,22 @@ frameworks:
     eventChange:
         - outputName: OUTPUT
           inputNames: [INPUT]
-          className: org.apache.pinot.thirdeye.rootcause.impl.EmptyPipeline
+          className: org.apache.pinot.thirdeye.rootcause.impl.ThirdEyeEventsPipeline
+          properties:
+            strategy: COMPOUND
+            k: 500
+            eventType: CM
 
     eventDeployment:
         - outputName: OUTPUT
           inputNames: [INPUT]
-          className: org.apache.pinot.thirdeye.rootcause.impl.EmptyPipeline
+          className: org.apache.pinot.thirdeye.rootcause.impl.ThirdEyeEventsPipeline
+          properties:
+            strategy: COMPOUND
+            k: 500
+            eventType: INFORMED
 
+    eventAC:
+      - outputName: OUTPUT
+        inputNames: [INPUT]
+        className: org.apache.pinot.thirdeye.rootcause.impl.EmptyPipeline
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/MockEventsLoaderConfiguration.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/MockEventsLoaderConfiguration.java
new file mode 100644
index 0000000..4241aeb
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/MockEventsLoaderConfiguration.java
@@ -0,0 +1,76 @@
+package org.apache.pinot.thirdeye.anomaly;
+
+
+import org.apache.pinot.thirdeye.anomaly.events.MockEventsLoader;
+
+import java.util.Collections;
+import java.util.List;
+
+public class MockEventsLoaderConfiguration {
+    public static class EventGeneratorConfig {
+        String type;
+        String arrivalType = MockEventsLoader.DIST_TYPE_EXPONENTIAL;
+        double arrivalMean;
+        String durationType = MockEventsLoader.DIST_TYPE_FIXED;
+        double durationMean = 86400000;
+        int seed = 0;
+
+        public String getType() {
+            return type;
+        }
+
+        public void setType(String type) {
+            this.type = type;
+        }
+
+        public String getArrivalType() {
+            return arrivalType;
+        }
+
+        public void setArrivalType(String arrivalType) {
+            this.arrivalType = arrivalType;
+        }
+
+        public double getArrivalMean() {
+            return arrivalMean;
+        }
+
+        public void setArrivalMean(double arrivalMean) {
+            this.arrivalMean = arrivalMean;
+        }
+
+        public String getDurationType() {
+            return durationType;
+        }
+
+        public void setDurationType(String durationType) {
+            this.durationType = durationType;
+        }
+
+        public double getDurationMean() {
+            return durationMean;
+        }
+
+        public void setDurationMean(double durationMean) {
+            this.durationMean = durationMean;
+        }
+
+        public int getSeed() {
+            return seed;
+        }
+
+        public void setSeed(int seed) {
+            this.seed = seed;
+        }
+    }
+
+    List<EventGeneratorConfig> generators = Collections.emptyList();
+
+    public List<EventGeneratorConfig> getGenerators() {
+        return generators;
+    }
+
+    public void setGenerators(List<EventGeneratorConfig> generators) {
+        this.generators = generators;
+    }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/ThirdEyeAnomalyApplication.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/ThirdEyeAnomalyApplication.java
index 08126dd..23f979c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/ThirdEyeAnomalyApplication.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/ThirdEyeAnomalyApplication.java
@@ -28,6 +28,7 @@ import org.apache.pinot.thirdeye.anomaly.detection.trigger.DataAvailabilityEvent
 import org.apache.pinot.thirdeye.anomaly.detection.trigger.DataAvailabilityTaskScheduler;
 import org.apache.pinot.thirdeye.anomaly.events.HolidayEventResource;
 import org.apache.pinot.thirdeye.anomaly.events.HolidayEventsLoader;
+import org.apache.pinot.thirdeye.anomaly.events.MockEventsLoader;
 import org.apache.pinot.thirdeye.anomaly.monitor.MonitorJobScheduler;
 import org.apache.pinot.thirdeye.anomaly.task.TaskDriver;
 import org.apache.pinot.thirdeye.anomalydetection.alertFilterAutotune.AlertFilterAutotuneFactory;
@@ -73,6 +74,7 @@ public class ThirdEyeAnomalyApplication
   private ClassificationJobScheduler classificationJobScheduler = null;
   private EmailResource emailResource = null;
   private HolidayEventsLoader holidayEventsLoader = null;
+  private MockEventsLoader mockEventsLoader = null;
   private RequestStatisticsLogger requestStatisticsLogger = null;
   private DataAvailabilityEventListenerDriver dataAvailabilityEventListenerDriver = null;
   private DataAvailabilityTaskScheduler dataAvailabilityTaskScheduler = null;
@@ -170,6 +172,10 @@ public class ThirdEyeAnomalyApplication
           holidayEventsLoader.start();
           environment.jersey().register(new HolidayEventResource(holidayEventsLoader));
         }
+        if (config.isMockEventsLoader()) {
+          mockEventsLoader = new MockEventsLoader(config.getMockEventsLoaderConfiguration(), DAORegistry.getInstance().getEventDAO());
+          mockEventsLoader.run();
+        }
         if (config.isDataCompleteness()) {
           dataCompletenessScheduler = new DataCompletenessScheduler();
           dataCompletenessScheduler.start();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/ThirdEyeAnomalyConfiguration.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/ThirdEyeAnomalyConfiguration.java
index 8c8ff52..23d3b83 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/ThirdEyeAnomalyConfiguration.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/ThirdEyeAnomalyConfiguration.java
@@ -34,6 +34,7 @@ public class ThirdEyeAnomalyConfiguration extends ThirdEyeConfiguration {
   private boolean classifier = false;
   private boolean dataCompleteness = false;
   private boolean holidayEventsLoader = false;
+  private boolean mockEventsLoader = false;
   private boolean monitor = false;
   private boolean pinotProxy = false;
   private boolean scheduler = false;
@@ -46,6 +47,7 @@ public class ThirdEyeAnomalyConfiguration extends ThirdEyeConfiguration {
   private long id;
   private String dashboardHost;
   private HolidayEventsLoaderConfiguration holidayEventsLoaderConfiguration = new HolidayEventsLoaderConfiguration();
+  private MockEventsLoaderConfiguration mockEventsLoaderConfiguration = new MockEventsLoaderConfiguration();
   private MonitorConfiguration monitorConfiguration = new MonitorConfiguration();
   private AutoOnboardConfiguration autoOnboardConfiguration = new AutoOnboardConfiguration();
   private TaskDriverConfiguration taskDriverConfiguration = new TaskDriverConfiguration();
@@ -63,6 +65,22 @@ public class ThirdEyeAnomalyConfiguration extends ThirdEyeConfiguration {
     this.holidayEventsLoaderConfiguration = holidayEventsLoaderConfiguration;
   }
 
+  public boolean isMockEventsLoader() {
+    return mockEventsLoader;
+  }
+
+  public void setMockEventsLoader(boolean mockEventsLoader) {
+    this.mockEventsLoader = mockEventsLoader;
+  }
+
+  public MockEventsLoaderConfiguration getMockEventsLoaderConfiguration() {
+    return mockEventsLoaderConfiguration;
+  }
+
+  public void setMockEventsLoaderConfiguration(MockEventsLoaderConfiguration mockEventsLoaderConfiguration) {
+    this.mockEventsLoaderConfiguration = mockEventsLoaderConfiguration;
+  }
+
   public boolean isDetectionAlert() {
     return detectionAlert;
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/events/MockEventsLoader.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/events/MockEventsLoader.java
new file mode 100644
index 0000000..62f5b79
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/events/MockEventsLoader.java
@@ -0,0 +1,129 @@
+package org.apache.pinot.thirdeye.anomaly.events;
+
+import org.apache.commons.math3.distribution.*;
+import org.apache.commons.math3.random.Well19937c;
+import org.apache.pinot.thirdeye.anomaly.MockEventsLoaderConfiguration;
+import org.apache.pinot.thirdeye.datalayer.bao.EventManager;
+import org.apache.pinot.thirdeye.datalayer.dto.EventDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+
+public class MockEventsLoader implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(MockEventsLoader.class);
+
+    private static Comparator<EventDTO> MOCK_EVENT_COMPARATOR = Comparator
+            .comparingLong(EventDTO::getStartTime)
+            .thenComparingLong(EventDTO::getEndTime)
+            .thenComparing(EventDTO::getEventType)
+            .thenComparing(EventDTO::getName);
+
+    private static final long START_TIMESTAMP = 1546300800000L; // January 1, 2019 12:00:00 AM GMT
+    private static final long END_OFFSET = 31536000000L; // 365d in ms
+
+    public static final String DIST_TYPE_GAUSSIAN = "gaussian";
+    public static final String DIST_TYPE_EXPONENTIAL = "exponential";
+    public static final String DIST_TYPE_LOGNORMAL = "lognormal";
+    public static final String DIST_TYPE_FIXED = "fixed";
+
+    MockEventsLoaderConfiguration configuration;
+    EventManager eventDAO;
+
+    public MockEventsLoader(MockEventsLoaderConfiguration configuration, EventManager eventDAO) {
+        this.configuration = configuration;
+        this.eventDAO = eventDAO;
+    }
+
+    @Override
+    public void run() {
+        final long cutoff = System.currentTimeMillis() + END_OFFSET;
+
+        for (MockEventsLoaderConfiguration.EventGeneratorConfig conf : this.configuration.getGenerators()) {
+            LOG.info("Generating '{}' events from {} to {}", conf.getType(), START_TIMESTAMP, cutoff);
+
+            List<EventDTO> generated = generateEvents(conf, cutoff);
+            List<EventDTO> existing = this.eventDAO.findEventsBetweenTimeRange(conf.getType(), START_TIMESTAMP, cutoff);
+
+            Set<EventDTO> deduplicated = dedup(generated, existing);
+            LOG.info("Generated '{}' events: {} generated, {} pre-existing, {} saved after deduplication",
+                    conf.getType(), generated.size(), existing.size(), deduplicated.size());
+
+            deduplicated.forEach(this.eventDAO::save);
+        }
+    }
+
+    List<EventDTO> generateEvents(MockEventsLoaderConfiguration.EventGeneratorConfig conf, long cutoff) {
+        List<EventDTO> generated = new ArrayList<>();
+
+        AbstractRealDistribution arrivalDist = makeDist(conf.getArrivalType(), conf.getArrivalMean(), conf.getSeed());
+        AbstractRealDistribution durationDist = makeDist(conf.getDurationType(), conf.getDurationMean(), conf.getSeed());
+
+        EventGenerator generator = new EventGenerator(conf.getType(), arrivalDist, durationDist);
+
+        EventDTO event;
+        while ((event = generator.next()).getStartTime() < cutoff) {
+            generated.add(event);
+        }
+
+        return generated;
+    }
+
+    Set<EventDTO> dedup(Collection<EventDTO> generated, Collection<EventDTO> existing) {
+        Set<EventDTO> sorted = new TreeSet<>(MOCK_EVENT_COMPARATOR);
+        sorted.addAll(generated);
+        existing.forEach(sorted::remove);
+        return sorted;
+    }
+
+    AbstractRealDistribution makeDist(String type, double param, int seed) {
+        switch (type.toLowerCase()) {
+            case DIST_TYPE_FIXED:
+                return new UniformRealDistribution(param, param + 0.001);
+            case DIST_TYPE_GAUSSIAN:
+                return new NormalDistribution(new Well19937c(seed), param, 1.0d, 1.0E-9D);
+            case DIST_TYPE_EXPONENTIAL:
+                return new ExponentialDistribution(new Well19937c(seed), param, 1.0E-9D);
+            case DIST_TYPE_LOGNORMAL:
+                return new LogNormalDistribution(new Well19937c(seed), param, 1.0d, 1.0E-9D);
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported distribution type '%s'", type));
+        }
+    }
+
+    static class EventGenerator {
+        String type;
+        AbstractRealDistribution arrivalDist;
+        AbstractRealDistribution durationDist;
+
+        int eventCount = 0;
+        long lastTimestamp = START_TIMESTAMP;
+
+        public EventGenerator(String type, AbstractRealDistribution arrivalDist, AbstractRealDistribution durationDist) {
+            this.type = type;
+            this.arrivalDist = arrivalDist;
+            this.durationDist = durationDist;
+        }
+
+        public EventDTO next() {
+            long arrival = lastTimestamp + (long) this.arrivalDist.sample();
+            long duration = (long) this.durationDist.sample();
+
+            this.lastTimestamp = arrival;
+
+            EventDTO event = new EventDTO();
+            event.setStartTime(arrival);
+            event.setEndTime(arrival + duration);
+            event.setName(this.makeName());
+            event.setEventType(this.type.toUpperCase());
+            event.setTargetDimensionMap(Collections.emptyMap());
+
+            return event;
+        }
+
+        String makeName() {
+            return String.format("%s-%d", this.type, ++this.eventCount);
+        }
+    }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/EventIndex.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/EventIndex.java
index 108141e..f3fb663 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/EventIndex.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/EventIndex.java
@@ -19,11 +19,9 @@
 
 package org.apache.pinot.thirdeye.datalayer.entity;
 
-import org.apache.pinot.thirdeye.anomaly.events.EventType;
-
 public class EventIndex extends AbstractIndexEntity {
   String name;
-  EventType eventType;
+  String eventType;
   long startTime;
   long endTime;
   String metricName;
@@ -45,11 +43,11 @@ public class EventIndex extends AbstractIndexEntity {
     this.endTime = endTime;
   }
 
-  public EventType getEventType() {
+  public String getEventType() {
     return eventType;
   }
 
-  public void setEventType(EventType eventType) {
+  public void setEventType(String eventType) {
     this.eventType = eventType;
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/mock/MockThirdEyeDataSource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/mock/MockThirdEyeDataSource.java
index a022b0f..0731e84 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/mock/MockThirdEyeDataSource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/mock/MockThirdEyeDataSource.java
@@ -94,8 +94,6 @@ public class MockThirdEyeDataSource implements ThirdEyeDataSource {
    * @throws Exception if properties cannot be parsed
    */
   public MockThirdEyeDataSource(Map<String, Object> properties) throws Exception {
-    loadMockCSVData(properties);
-
     // datasets
     this.datasets = new HashMap<>();
     Map<String, Object> config = ConfigUtils.getMap(properties.get(DATASETS));
@@ -214,6 +212,8 @@ public class MockThirdEyeDataSource implements ThirdEyeDataSource {
 
       onboarding.runAdhoc();
     }
+
+    loadMockCSVData(properties);
   }
 
   private void loadMockCSVData(Map<String, Object> properties) throws Exception {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/rootcause/impl/ThirdEyeEventEntity.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/rootcause/impl/ThirdEyeEventEntity.java
index 6ec1459..9c8e0c8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/rootcause/impl/ThirdEyeEventEntity.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/rootcause/impl/ThirdEyeEventEntity.java
@@ -19,9 +19,9 @@
 
 package org.apache.pinot.thirdeye.rootcause.impl;
 
-import org.apache.pinot.thirdeye.anomaly.events.EventType;
 import org.apache.pinot.thirdeye.datalayer.dto.EventDTO;
 import org.apache.pinot.thirdeye.rootcause.Entity;
+
 import java.util.ArrayList;
 import java.util.List;
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/rootcause/impl/ThirdEyeEventsPipeline.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/rootcause/impl/ThirdEyeEventsPipeline.java
index f70b0d8..24e4a47 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/rootcause/impl/ThirdEyeEventsPipeline.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/rootcause/impl/ThirdEyeEventsPipeline.java
@@ -19,6 +19,7 @@
 
 package org.apache.pinot.thirdeye.rootcause.impl;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.thirdeye.datalayer.bao.EventManager;
 import org.apache.pinot.thirdeye.datalayer.dto.EventDTO;
 import org.apache.pinot.thirdeye.datalayer.util.Predicate;
@@ -36,6 +37,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
 import org.apache.commons.collections4.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,7 +142,7 @@ public class ThirdEyeEventsPipeline extends Pipeline {
     return this.eventDAO.findByPredicate(Predicate.AND(
         Predicate.GE("startTime", start - OVERFETCH),
         Predicate.LT("endTime", end + OVERFETCH),
-        Predicate.EQ("eventType", eventType.toUpperCase())
+        Predicate.EQ("eventType", this.eventType.toUpperCase())
     ));
   }
 
@@ -160,7 +163,7 @@ public class ThirdEyeEventsPipeline extends Pipeline {
         }
       }
 
-      ThirdEyeEventEntity entity = ThirdEyeEventEntity.fromDTO(1.0, related, dto, eventType);
+      ThirdEyeEventEntity entity = ThirdEyeEventEntity.fromDTO(1.0, related, dto, this.eventType.toLowerCase());
       entities.add(entity.withScore(strategy.score(entity) * coefficient));
     }
     return entities;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org