You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/12/13 08:56:43 UTC

incubator-eagle git commit: EAGLE-837: Stream definition change does not reflect in AlertBolt

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 743de7330 -> 7c6315311


EAGLE-837: Stream definition change does not reflect in AlertBolt

Stream definition change only trigger router bolt & publisher update, we don't update corresponding alert bolt stream definition references. It will cause alert bolt still use old stream definition references, it could produce array index out of bound exception.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7c631531
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7c631531
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7c631531

Branch: refs/heads/master
Commit: 7c6315311f2f08992773d175d80314c449c7e9b6
Parents: 743de73
Author: Xiancheng Li <xi...@ebay.com>
Authored: Mon Dec 12 19:17:47 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Tue Dec 13 16:10:00 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/coordinator/StreamColumn.java  | 32 ++++++++
 .../engine/coordinator/StreamDefinition.java    | 35 +++++++++
 .../coordinator/StreamDefinitionTest.java       |  4 +-
 .../eagle/alert/engine/runner/AlertBolt.java    | 20 ++++-
 .../alert/engine/router/TestAlertBolt.java      | 81 ++++++++++++++++++++
 5 files changed, 169 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7c631531/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index 9705efc..ba736fe 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -21,9 +21,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.Objects;
+
 import javax.xml.bind.annotation.adapters.XmlAdapter;
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 public class StreamColumn implements Serializable {
 
     private static final long serialVersionUID = -5457861313624389106L;
@@ -39,6 +43,34 @@ public class StreamColumn implements Serializable {
             name, type, defaultValue, required, nodataExpression);
     }
 
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder()
+            .append(this.name)
+            .append(this.type)
+            .append(this.defaultValue)
+            .append(this.required)
+            .append(this.description)
+            .append(this.nodataExpression)
+            .build();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof StreamColumn)) {
+            return false;
+        }
+        return Objects.equals(this.name, ((StreamColumn) obj).name)
+            && Objects.equals(this.type, ((StreamColumn) obj).type)
+            && Objects.equals(this.defaultValue, ((StreamColumn) obj).defaultValue)
+            && Objects.equals(this.required, ((StreamColumn) obj).required)
+            && Objects.equals(this.description, ((StreamColumn) obj).description)
+            && Objects.equals(this.nodataExpression, ((StreamColumn) obj).nodataExpression);
+    }
+
     public String getNodataExpression() {
         return nodataExpression;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7c631531/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
index 1be36f3..9512f1a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
@@ -18,9 +18,14 @@ package org.apache.eagle.alert.engine.coordinator;
 
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlElementWrapper;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * This is actually a data source schema.
@@ -62,6 +67,36 @@ public class StreamDefinition implements Serializable {
             columns);
     }
 
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder()
+            .append(this.streamId)
+            .append(this.description)
+            .append(this.validate)
+            .append(this.timeseries)
+            .append(this.dataSource)
+            .append(this.siteId)
+            .append(this.columns)
+            .build();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof StreamDefinition)) {
+            return false;
+        }
+        return Objects.equals(this.streamId, ((StreamDefinition) obj).streamId)
+            && Objects.equals(this.description, ((StreamDefinition) obj).description)
+            && Objects.equals(this.validate, ((StreamDefinition) obj).validate)
+            && Objects.equals(this.timeseries, ((StreamDefinition) obj).timeseries)
+            && Objects.equals(this.dataSource, ((StreamDefinition) obj).dataSource)
+            && Objects.equals(this.siteId, ((StreamDefinition) obj).siteId)
+            && CollectionUtils.isEqualCollection(this.columns, ((StreamDefinition) obj).columns);
+    }
+
     public String getStreamId() {
         return streamId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7c631531/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
index b5015cd..e33ef07 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
@@ -45,8 +45,8 @@ public class StreamDefinitionTest {
         StreamDefinition streamDefinition1 = streamDefinition.copy();
         Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition1.toString());
 
-        Assert.assertFalse(streamDefinition1.equals(streamDefinition));
+        Assert.assertTrue(streamDefinition1.equals(streamDefinition));
         Assert.assertFalse(streamDefinition1 == streamDefinition);
-        Assert.assertFalse(streamDefinition1.hashCode() == streamDefinition.hashCode());
+        Assert.assertTrue(streamDefinition1.hashCode() == streamDefinition.hashCode());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7c631531/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index 7d66f47..edf1b6f 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -103,7 +104,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
                 pe.getEvent().setMetaVersion(specVersion);
             } else if (streamEventVersion != null && !streamEventVersion.equals(specVersion)) {
                 if (specVersion != null && streamEventVersion != null
-                        && specVersion.contains("spec_version_") && streamEventVersion.contains("spec_version_")) {
+                    && specVersion.contains("spec_version_") && streamEventVersion.contains("spec_version_")) {
                     // check if specVersion is older than stream_event_version
                     // Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]);
                     // Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]);
@@ -195,6 +196,23 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
         MapComparator<String, PolicyDefinition> comparator = new MapComparator<>(newPoliciesMap, cachedPolicies);
         comparator.compare();
 
+        MapComparator<String, StreamDefinition> streamComparator = new MapComparator<>(sds, sdf);
+        streamComparator.compare();
+
+        List<StreamDefinition> addOrUpdatedStreams = streamComparator.getAdded();
+        addOrUpdatedStreams.addAll(streamComparator.getModified());
+        List<PolicyDefinition> cachedPoliciesTemp = new ArrayList<>(cachedPolicies.values());
+        addOrUpdatedStreams.forEach(s -> {
+            cachedPoliciesTemp.stream().filter(p -> p.getInputStreams().contains(s.getStreamId())
+                || p.getOutputStreams().contains(s.getStreamId())).forEach(
+                    p -> {
+                        if (!comparator.getModified().contains(p) && !comparator.getAdded().contains(p)) {
+                            comparator.getModified().add(p);
+                        }
+                    });
+            ;
+        });
+
         policyGroupEvaluator.onPolicyChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), sds);
 
         // update alert output collector

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7c631531/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index e6acc3e..8ae29d5 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -46,6 +46,8 @@ import org.apache.eagle.common.DateTimeUtil;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
@@ -65,6 +67,8 @@ import static org.mockito.Mockito.when;
 public class TestAlertBolt {
 
     public static final String TEST_STREAM = "test-stream";
+    
+    private static final Logger LOG = LoggerFactory.getLogger(TestAlertBolt.class);
 
     /**
      * Following knowledge is guaranteed in
@@ -443,6 +447,83 @@ public class TestAlertBolt {
 
         Assert.assertTrue(recieved.get());
     }
+    
+    @Test
+    public void testStreamDefinitionChange() throws IOException {
+        PolicyDefinition def = new PolicyDefinition();
+        def.setName("policy-definition");
+        def.setInputStreams(Arrays.asList(TEST_STREAM));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE);
+        definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler");
+        definition.setValue("PT0M,plain,1,host,host1");
+        def.setDefinition(definition);
+        def.setPartitionSpec(Arrays.asList(createPartition()));
+
+        AlertBoltSpec boltSpecs = new AlertBoltSpec();
+
+        AtomicBoolean recieved = new AtomicBoolean(false);
+        OutputCollector collector = new OutputCollector(new IOutputCollector() {
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+                recieved.set(true);
+                return Collections.emptyList();
+            }
+
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+            }
+
+            @Override
+            public void ack(Tuple input) {
+            }
+
+            @Override
+            public void fail(Tuple input) {
+            }
+
+            @Override
+            public void reportError(Throwable error) {
+            }
+        });
+        AlertBolt bolt = createAlertBolt(collector);
+
+        boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
+        boltSpecs.setVersion("spec_" + System.currentTimeMillis());
+        // stream def map
+        Map<String, StreamDefinition> sds = new HashMap();
+        StreamDefinition sdTest = new StreamDefinition();
+        sdTest.setStreamId(TEST_STREAM);
+        sds.put(sdTest.getStreamId(), sdTest);
+        
+        boltSpecs.addPublishPartition(TEST_STREAM, "policy-definition", "testAlertPublish", null);
+
+        bolt.onAlertBoltSpecChange(boltSpecs, sds);
+
+        // how to assert
+        Tuple t = createTuple(bolt, boltSpecs.getVersion());
+
+        bolt.execute(t);
+
+        Assert.assertTrue(recieved.get());
+        
+        LOG.info("Update stream");
+        sds = new HashMap();
+        sdTest = new StreamDefinition();
+        sdTest.setStreamId(TEST_STREAM);
+        sds.put(sdTest.getStreamId(), sdTest);
+        sdTest.setDescription("update the stream");
+        bolt.onAlertBoltSpecChange(boltSpecs, sds);
+        
+        LOG.info("No any change");
+        sds = new HashMap();
+        sdTest = new StreamDefinition();
+        sdTest.setStreamId(TEST_STREAM);
+        sds.put(sdTest.getStreamId(), sdTest);
+        sdTest.setDescription("update the stream");
+        bolt.onAlertBoltSpecChange(boltSpecs, sds);
+    }
 
     @Test @Ignore
     public void testMultiStreamDefinition() throws Exception {