You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:46 UTC
[38/84] [partial] eagle git commit: Clean repo for eagle site
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
deleted file mode 100644
index c377e41..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.ListUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.hibernate.validator.constraints.Length;
-
-import java.io.Serializable;
-import java.util.*;
-
-/**
- * @since Apr 5, 2016.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class PolicyDefinition implements Serializable {
- private static final long serialVersionUID = 377581499339572414L;
- // unique identifier
- @Length(min = 1, max = 50, message = "length should between 1 and 50")
- private String name;
- private String description;
- private List<String> inputStreams = new ArrayList<>();
- private List<String> outputStreams = new ArrayList<>();
- private String siteId = "default";
-
- private Definition definition;
- private Definition stateDefinition;
- private PolicyStatus policyStatus = PolicyStatus.ENABLED;
- private AlertDefinition alertDefinition;
-
- // one stream only have one partition in one policy, since we don't support stream alias
- private List<StreamPartition> partitionSpec = new ArrayList<StreamPartition>();
- private boolean dedicated;
-
- // runtime configuration for policy, these are user-invisible
- private int parallelismHint = 1;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-
- public List<String> getInputStreams() {
- return inputStreams;
- }
-
- public void setInputStreams(List<String> inputStreams) {
- this.inputStreams = inputStreams;
- }
-
- public List<String> getOutputStreams() {
- return outputStreams;
- }
-
- public void setOutputStreams(List<String> outputStreams) {
- this.outputStreams = outputStreams;
- }
-
- public Definition getDefinition() {
- return definition;
- }
-
- public Definition getStateDefinition() {
- return stateDefinition;
- }
-
- public void setStateDefinition(Definition stateDefinition) {
- this.stateDefinition = stateDefinition;
- }
-
- public void setDefinition(Definition definition) {
- this.definition = definition;
- }
-
- public List<StreamPartition> getPartitionSpec() {
- return partitionSpec;
- }
-
- public void setPartitionSpec(List<StreamPartition> partitionSpec) {
- this.partitionSpec = partitionSpec;
- }
-
- public void addPartition(StreamPartition par) {
- this.partitionSpec.add(par);
- }
-
- public boolean isDedicated() {
- return dedicated;
- }
-
- public void setDedicated(boolean dedicated) {
- this.dedicated = dedicated;
- }
-
- public int getParallelismHint() {
- return parallelismHint;
- }
-
- public void setParallelismHint(int parallelism) {
- this.parallelismHint = parallelism;
- }
-
- public PolicyStatus getPolicyStatus() {
- return policyStatus;
- }
-
- public void setPolicyStatus(PolicyStatus policyStatus) {
- this.policyStatus = policyStatus;
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder()
- .append(siteId)
- .append(name)
- .append(inputStreams)
- .append(outputStreams)
- .append(definition)
- .append(partitionSpec)
- .append(policyStatus)
- .append(parallelismHint)
- .append(alertDefinition)
- .build();
- }
-
- @Override
- public boolean equals(Object that) {
- if (that == this) {
- return true;
- }
-
- if (!(that instanceof PolicyDefinition)) {
- return false;
- }
-
- PolicyDefinition another = (PolicyDefinition) that;
-
- if (Objects.equals(another.siteId, this.siteId)
- && Objects.equals(another.name, this.name)
- && Objects.equals(another.description, this.description)
- && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
- && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
- && (another.definition != null && another.definition.equals(this.definition))
- && Objects.equals(this.definition, another.definition)
- && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
- && another.policyStatus.equals(this.policyStatus)
- && another.parallelismHint == this.parallelismHint
- && Objects.equals(another.alertDefinition, alertDefinition)) {
- return true;
- }
- return false;
- }
-
- public AlertDefinition getAlertDefinition() {
- return alertDefinition;
- }
-
- public void setAlertDefinition(AlertDefinition alertDefinition) {
- this.alertDefinition = alertDefinition;
- }
-
- public AlertSeverity getAlertSeverity() {
- return alertDefinition == null ? null : alertDefinition.getSeverity();
- }
-
- public String getAlertCategory() {
- return alertDefinition == null ? null : alertDefinition.getCategory();
- }
-
- public String getSiteId() {
- return siteId;
- }
-
- public void setSiteId(String siteId) {
- this.siteId = siteId;
- }
-
- @JsonIgnoreProperties(ignoreUnknown = true)
- public static class Definition implements Serializable {
- private static final long serialVersionUID = -622366527887848346L;
-
- public String type;
- public String value;
- public String handlerClass;
- public Map<String, Object> properties = new HashMap<>();
-
- private List<String> inputStreams = new ArrayList<String>();
- private List<String> outputStreams = new ArrayList<String>();
-
- public Definition(String type, String value) {
- this.type = type;
- this.value = value;
- }
-
- public Definition() {
- this.type = null;
- this.value = null;
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(type).append(value).build();
- }
-
- @Override
- public boolean equals(Object that) {
- if (that == this) {
- return true;
- }
- if (!(that instanceof Definition)) {
- return false;
- }
- Definition another = (Definition) that;
- if (another.type.equals(this.type)
- && another.value.equals(this.value)
- && ListUtils.isEqualList(another.inputStreams, this.inputStreams)
- && ListUtils.isEqualList(another.outputStreams, this.outputStreams)) {
- return true;
- }
- return false;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getValue() {
- return value;
- }
-
- public void setValue(String value) {
- this.value = value;
- }
-
- public void setInputStreams(List<String> inputStreams) {
- this.inputStreams = inputStreams;
- }
-
- public void setOutputStreams(List<String> outputStreams) {
- this.outputStreams = outputStreams;
- }
-
- public List<String> getInputStreams() {
- return inputStreams;
- }
-
- public List<String> getOutputStreams() {
- return outputStreams;
- }
-
- public String getHandlerClass() {
- return handlerClass;
- }
-
- public void setHandlerClass(String handlerClass) {
- this.handlerClass = handlerClass;
- }
-
- public Map<String, Object> getProperties() {
- return properties;
- }
-
- public void setProperties(Map<String, Object> properties) {
- this.properties = properties;
- }
-
- @Override
- public String toString() {
- return String.format("{type=\"%s\",value=\"%s\", inputStreams=\"%s\", outputStreams=\"%s\" }", type, value, inputStreams, outputStreams);
- }
- }
-
- public static enum PolicyStatus {
- ENABLED, DISABLED
- }
-
- @Override
- public String toString() {
- return String.format("{site=\"%s\", name=\"%s\",definition=%s}", this.getSiteId(), this.getName(), this.getDefinition() == null ? "null" : this.getDefinition().toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java
deleted file mode 100644
index 7e57f88..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.base.Objects;
-
-public class PublishPartition implements Serializable {
-
- private static final long serialVersionUID = 2524776632955586234L;
-
- private String policyId;
- private String streamId;
- private String publishId;
- private Set<String> columns = new HashSet<>();
-
- @JsonIgnore
- private Set<Object> columnValues = new HashSet<>();
-
- public PublishPartition() {
- }
-
- public PublishPartition(String streamId, String policyId, String publishId, Set<String> columns) {
- this.streamId = streamId;
- this.policyId = policyId;
- this.publishId = publishId;
- if (columns != null) {
- this.columns = columns;
- }
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(streamId).append(policyId).append(publishId).append(columns).append(columnValues).build();
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj instanceof PublishPartition
- && Objects.equal(this.streamId, ((PublishPartition) obj).getStreamId())
- && Objects.equal(this.policyId, ((PublishPartition) obj).getPolicyId())
- && Objects.equal(this.publishId, ((PublishPartition) obj).getPublishId())
- && CollectionUtils.isEqualCollection(this.columns, ((PublishPartition) obj).getColumns())
- && CollectionUtils.isEqualCollection(this.columnValues, ((PublishPartition) obj).getColumnValues());
- }
-
- @Override
- public String toString() {
- return String.format("PublishPartition[policyId=%s,streamId=%s,publishId=%s,columns=%s,columnValues=%s]",
- policyId, streamId, publishId, columns, columnValues);
- }
-
- @Override
- public PublishPartition clone() {
- return new PublishPartition(this.streamId, this.policyId, this.publishId, new HashSet<>(this.columns));
- }
-
- public String getPolicyId() {
- return policyId;
- }
-
- public void setPolicyId(String policyId) {
- this.policyId = policyId;
- }
-
- public String getStreamId() {
- return streamId;
- }
-
- public void setStreamId(String streamId) {
- this.streamId = streamId;
- }
-
- public String getPublishId() {
- return publishId;
- }
-
- public void setPublishId(String publishId) {
- this.publishId = publishId;
- }
-
- public Set<String> getColumns() {
- return columns;
- }
-
- public void setColumns(Set<String> columns) {
- this.columns = columns;
- }
-
- @JsonIgnore
- public Set<Object> getColumnValues() {
- return columnValues;
- }
-
- @JsonIgnore
- public void setColumnValues(Set<Object> columnValues) {
- this.columnValues = columnValues;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
deleted file mode 100644
index 74a3d69..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * @since Apr 11, 2016.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Publishment {
-
- public static final String STREAM_NAME_DEFAULT = "_default";
-
- private String name;
- private String type;
- private List<String> policyIds;
- private List<String> streamIds;
- private String dedupIntervalMin;
- private List<String> dedupFields;
- private String dedupStateField;
- private String dedupStateCloseValue;
- private OverrideDeduplicatorSpec overrideDeduplicator;
- private Map<String, Object> properties;
- // the class name to extend the IEventSerializer interface
- private String serializer;
-
- private Set<String> partitionColumns = new HashSet<>();
-
- public Set<String> getPartitionColumns() {
- return partitionColumns;
- }
-
- public void setPartitionColumns(Set<String> partitionColumns) {
- this.partitionColumns = partitionColumns;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getDedupStateField() {
- return dedupStateField;
- }
-
- public void setDedupStateField(String dedupStateField) {
- this.dedupStateField = dedupStateField;
- }
-
- public String getDedupStateCloseValue() {
- return dedupStateCloseValue;
- }
-
- public void setDedupStateCloseValue(String dedupStateCloseValue) {
- this.dedupStateCloseValue = dedupStateCloseValue;
- }
-
- public OverrideDeduplicatorSpec getOverrideDeduplicator() {
- return overrideDeduplicator;
- }
-
- public void setOverrideDeduplicator(OverrideDeduplicatorSpec overrideDeduplicator) {
- this.overrideDeduplicator = overrideDeduplicator;
- }
-
- public String getSerializer() {
- return serializer;
- }
-
- public void setSerializer(String serializer) {
- this.serializer = serializer;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public List<String> getPolicyIds() {
- return policyIds;
- }
-
- public void setPolicyIds(List<String> policyIds) {
- this.policyIds = policyIds;
- }
-
- public List<String> getStreamIds() {
- return streamIds;
- }
-
- public void setStreamIds(List<String> streamIds) {
- this.streamIds = streamIds;
- }
-
- public String getDedupIntervalMin() {
- return dedupIntervalMin;
- }
-
- public void setDedupIntervalMin(String dedupIntervalMin) {
- this.dedupIntervalMin = dedupIntervalMin;
- }
-
- public List<String> getDedupFields() {
- return dedupFields;
- }
-
- public void setDedupFields(List<String> dedupFields) {
- this.dedupFields = dedupFields;
- }
-
- public Map<String, Object> getProperties() {
- return properties;
- }
-
- public void setProperties(Map<String, Object> properties) {
- this.properties = properties;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof Publishment) {
- Publishment p = (Publishment) obj;
- return (Objects.equals(name, p.getName()) && Objects.equals(type, p.getType())
- && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
- && Objects.equals(dedupFields, p.getDedupFields())
- && Objects.equals(dedupStateField, p.getDedupStateField())
- && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator())
- && Objects.equals(policyIds, p.getPolicyIds())
- && Objects.equals(streamIds, p.getStreamIds())
- && properties.equals(p.getProperties()));
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(dedupFields)
- .append(dedupStateField).append(overrideDeduplicator).append(policyIds).append(streamIds)
- .append(properties).build();
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:")
- .append(policyIds).append(",properties:").append(properties);
- return sb.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
deleted file mode 100644
index f7025f2..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import java.util.*;
-
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class PublishmentType {
- private String name;
-
- @Override
- public String toString() {
- return "PublishmentType{"
- + "name='" + name + '\''
- + ", type='" + type + '\''
- + ", description='" + description + '\''
- + ", fields=" + fields
- + '}';
- }
-
- private String type;
- private String description;
- private List<Map<String, String>> fields = new LinkedList<>();
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-
- public List<Map<String, String>> getFields() {
- return fields;
- }
-
- public void setFields(List<Map<String, String>> fields) {
- this.fields = fields;
- }
-
-
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof PublishmentType) {
- PublishmentType p = (PublishmentType) obj;
- return (Objects.equals(name, p.name)
- && Objects.equals(type, p.type)
- && Objects.equals(description, p.getDescription())
- && Objects.equals(fields, p.getFields()));
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder()
- .append(name)
- .append(type)
- .append(description)
- .append(fields)
- .build();
- }
-
-
- public static class Builder {
- private final PublishmentType publishmentType;
-
- public Builder() {
- this.publishmentType = new PublishmentType();
- }
-
- public Builder type(Class<?> typeClass) {
- this.publishmentType.setType(typeClass.getName());
- return this;
- }
-
- public Builder name(String name) {
- this.publishmentType.setName(name);
- return this;
- }
-
- public Builder description(String description) {
- this.publishmentType.setDescription(description);
- return this;
- }
-
- public Builder field(Map<String,String> fieldDesc) {
- this.publishmentType.getFields().add(fieldDesc);
- return this;
- }
-
- public Builder field(String name, String value) {
- this.publishmentType.getFields().add(new HashMap<String,String>() {
- {
- put("name", name);
- put("value", value);
- }
- });
- return this;
- }
-
- public Builder field(String name) {
- this.publishmentType.getFields().add(new HashMap<String,String>() {
- {
- put("name", name);
- }
- });
- return this;
- }
-
- public PublishmentType build() {
- return this.publishmentType;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
deleted file mode 100644
index abd9dc5..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Objects;
-
-import javax.xml.bind.annotation.adapters.XmlAdapter;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-public class StreamColumn implements Serializable {
-
- private static final long serialVersionUID = -5457861313624389106L;
- private String name;
- private Type type;
- private Object defaultValue;
- private boolean required = true;
- private String description;
- private String nodataExpression;
-
- public String toString() {
- return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]",
- name, type, defaultValue, required, nodataExpression);
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder()
- .append(this.name)
- .append(this.type)
- .append(this.defaultValue)
- .append(this.required)
- .append(this.description)
- .append(this.nodataExpression)
- .build();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof StreamColumn)) {
- return false;
- }
- return Objects.equals(this.name, ((StreamColumn) obj).name)
- && Objects.equals(this.type, ((StreamColumn) obj).type)
- && Objects.equals(this.defaultValue, ((StreamColumn) obj).defaultValue)
- && Objects.equals(this.required, ((StreamColumn) obj).required)
- && Objects.equals(this.description, ((StreamColumn) obj).description)
- && Objects.equals(this.nodataExpression, ((StreamColumn) obj).nodataExpression);
- }
-
- public String getNodataExpression() {
- return nodataExpression;
- }
-
- public void setNodataExpression(String nodataExpression) {
- this.nodataExpression = nodataExpression;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- @XmlJavaTypeAdapter(StreamColumnTypeAdapter.class)
- public Type getType() {
- return type;
- }
-
- public void setType(Type type) {
- this.type = type;
- }
-
- @XmlJavaTypeAdapter(value = DefaultValueAdapter.class)
- public Object getDefaultValue() {
- return defaultValue;
- }
-
- private void ensureDefaultValueType() {
- if (this.getDefaultValue() != null && (this.getDefaultValue() instanceof String) && this.getType() != Type.STRING) {
- switch (this.getType()) {
- case INT:
- this.setDefaultValue(Integer.valueOf((String) this.getDefaultValue()));
- break;
- case LONG:
- this.setDefaultValue(Long.valueOf((String) this.getDefaultValue()));
- break;
- case FLOAT:
- this.setDefaultValue(Float.valueOf((String) this.getDefaultValue()));
- break;
- case DOUBLE:
- this.setDefaultValue(Double.valueOf((String) this.getDefaultValue()));
- break;
- case BOOL:
- this.setDefaultValue(Boolean.valueOf((String) this.getDefaultValue()));
- break;
- case OBJECT:
- try {
- this.setDefaultValue(new ObjectMapper().readValue((String) this.getDefaultValue(), HashMap.class));
- } catch (IOException e) {
- throw new IllegalArgumentException(e);
- }
- break;
- default:
- throw new IllegalArgumentException("Illegal type: " + this.getType());
- }
- }
- }
-
- public void setDefaultValue(Object defaultValue) {
- this.defaultValue = defaultValue;
- ensureDefaultValueType();
- }
-
- public boolean isRequired() {
- return required;
- }
-
- public void setRequired(boolean required) {
- this.required = required;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-
- public enum Type implements Serializable {
- STRING("string"), INT("int"), LONG("long"), FLOAT("float"), DOUBLE("double"), BOOL("bool"), OBJECT("object");
-
- private final String name;
-
- Type(String name) {
- this.name = name;
- }
-
- @Override
- public String toString() {
- return name;
- }
-
- @com.fasterxml.jackson.annotation.JsonCreator
- public static Type getEnumFromValue(String value) {
- for (Type testEnum : values()) {
- if (testEnum.name.equalsIgnoreCase(value)) {
- return testEnum;
- }
- }
- throw new IllegalArgumentException();
- }
- }
-
- public static class StreamColumnTypeAdapter extends XmlAdapter<String, Type> {
-
- @Override
- public Type unmarshal(String v) throws Exception {
- return Type.getEnumFromValue(v);
- }
-
- @Override
- public String marshal(Type v) throws Exception {
- return v.name;
- }
- }
-
- public static class DefaultValueAdapter extends XmlAdapter<String, Object> {
- @Override
- public Object unmarshal(String v) throws Exception {
- return v;
- }
-
- @Override
- public String marshal(Object v) throws Exception {
- return v.toString();
- }
- }
-
- public static class Builder {
- private StreamColumn column;
-
- public Builder() {
- column = new StreamColumn();
- }
-
- public Builder name(String name) {
- column.setName(name);
- return this;
- }
-
- public Builder type(Type type) {
- column.setType(type);
- return this;
- }
-
- public Builder defaultValue(Object defaultValue) {
- column.setDefaultValue(defaultValue);
- return this;
- }
-
- public Builder required(boolean required) {
- column.setRequired(required);
- return this;
- }
-
- public StreamColumn build() {
- return column;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
deleted file mode 100644
index af9d137..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlElementWrapper;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * This is actually a data source schema.
- *
- * @since Apr 5, 2016
- */
-public class StreamDefinition implements Serializable {
- private static final long serialVersionUID = 2352202882328931825L;
-
- // Stream unique ID
- private String streamId;
-
- // Stream description
- private String description;
-
- // Is validateable or not
- private boolean validate = true;
-
- // Is timeseries-based stream or not
- private boolean timeseries;
-
- // TODO: Decouple dataSource and siteId from stream definition
-
- // Stream data source ID
- private String dataSource;
-
- private String group = "global";
-
- //
- private String streamSource;
-
- // Tenant (Site) ID
- private String siteId;
-
- private List<StreamColumn> columns = new ArrayList<>();
-
- public String toString() {
- return String.format("StreamDefinition[group=%s, streamId=%s, dataSource=%s, description=%s, validate=%s, timeseries=%s, columns=%s",
- group,
- streamId,
- dataSource,
- description,
- validate,
- timeseries,
- columns
- );
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder()
- .append(this.streamId)
- .append(this.group)
- .append(this.description)
- .append(this.validate)
- .append(this.timeseries)
- .append(this.dataSource)
- .append(streamSource)
- .append(this.siteId)
- .append(this.columns)
- .build();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof StreamDefinition)) {
- return false;
- }
- StreamDefinition streamDefinition = (StreamDefinition) obj;
- return Objects.equals(this.streamId, streamDefinition.streamId)
- && Objects.equals(this.group, streamDefinition.group)
- && Objects.equals(this.description, streamDefinition.description)
- && Objects.equals(this.validate, streamDefinition.validate)
- && Objects.equals(this.timeseries, streamDefinition.timeseries)
- && Objects.equals(this.dataSource, streamDefinition.dataSource)
- && Objects.equals(this.streamSource, streamDefinition.streamSource)
- && Objects.equals(this.siteId, streamDefinition.siteId)
- && CollectionUtils.isEqualCollection(this.columns, streamDefinition.columns);
- }
-
- public String getStreamId() {
- return streamId;
- }
-
- public void setStreamId(String streamId) {
- this.streamId = streamId;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-
- @Deprecated
- public boolean isValidate() {
- return validate;
- }
-
- public void setValidate(boolean validate) {
- this.validate = validate;
- }
-
- public boolean isTimeseries() {
- return timeseries;
- }
-
- public void setTimeseries(boolean timeseries) {
- this.timeseries = timeseries;
- }
-
- @XmlElementWrapper(name = "columns")
- @XmlElement(name = "column")
- public List<StreamColumn> getColumns() {
- return columns;
- }
-
- public void setColumns(List<StreamColumn> columns) {
- this.columns = columns;
- }
-
- public String getDataSource() {
- return dataSource;
- }
-
- public void setDataSource(String dataSource) {
- this.dataSource = dataSource;
- }
-
- public int getColumnIndex(String column) {
- int i = 0;
- for (StreamColumn col : this.getColumns()) {
- if (col.getName().equals(column)) {
- return i;
- }
- i++;
- }
- return -1;
- }
-
- public String getSiteId() {
- return siteId;
- }
-
- public void setSiteId(String siteId) {
- this.siteId = siteId;
- }
-
- public String getStreamSource() {
- return streamSource;
- }
-
- public void setStreamSource(String streamSource) {
- this.streamSource = streamSource;
- }
-
- public StreamDefinition copy() {
- StreamDefinition copied = new StreamDefinition();
- copied.setColumns(this.getColumns());
- copied.setDataSource(this.getDataSource());
- copied.setDescription(this.getDescription());
- copied.setSiteId(this.getSiteId());
- copied.setStreamId(this.getStreamId());
- copied.setGroup(this.getGroup());
- copied.setTimeseries(this.isTimeseries());
- copied.setValidate(this.isValidate());
- return copied;
- }
-
- public String getGroup() {
- return group;
- }
-
- public void setGroup(String group) {
- this.group = group;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
deleted file mode 100644
index 0987463..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import java.io.Serializable;
-import java.util.*;
-
-/**
- * StreamPartition defines how a data stream is partitioned and sorted
- * streamId is used for distinguishing different streams which are spawned from the same data source
- * type defines how to partition data among slots within one slotqueue
- * columns are fields based on which stream is grouped
- * sortSpec defines how data is sorted.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class StreamPartition implements Serializable {
- private static final long serialVersionUID = -3361648309136926040L;
-
- private String streamId;
- private Type type;
- private List<String> columns = new ArrayList<>();
- private StreamSortSpec sortSpec;
-
- public StreamPartition() {
- }
-
- public StreamPartition(StreamPartition o) {
- this.streamId = o.streamId;
- this.type = o.type;
- this.columns = new ArrayList<String>(o.columns);
- this.sortSpec = o.sortSpec == null ? null : new StreamSortSpec(o.sortSpec);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == this) {
- return true;
- }
- if (!(other instanceof StreamPartition)) {
- return false;
- }
- StreamPartition sp = (StreamPartition) other;
- return Objects.equals(streamId, sp.streamId) && Objects.equals(type, sp.type)
- && CollectionUtils.isEqualCollection(columns, sp.columns) && Objects.equals(sortSpec, sp.sortSpec);
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(streamId).append(type).append(columns).append(sortSpec).build();
- }
-
- public void setType(Type type) {
- this.type = type;
- }
-
- public Type getType() {
- return this.type;
- }
-
- public enum Type {
- GLOBAL("GLOBAL", 0), GROUPBY("GROUPBY", 1), SHUFFLE("SHUFFLE", 2);
- private final String name;
- private final int index;
-
- Type(String name, int index) {
- this.name = name;
- this.index = index;
- }
-
- @Override
- public String toString() {
- return this.name;
- }
-
- public static Type locate(String type) {
- Type _type = _NAME_TYPE.get(type.toUpperCase());
- if (_type == null) {
- throw new IllegalStateException("Illegal type name: " + type);
- }
- return _type;
- }
-
- public static Type locate(int index) {
- Type _type = _INDEX_TYPE.get(index);
- if (_type == null) {
- throw new IllegalStateException("Illegal type index: " + index);
- }
- return _type;
- }
-
- private static final Map<String, Type> _NAME_TYPE = new HashMap<>();
- private static final Map<Integer, Type> _INDEX_TYPE = new TreeMap<>();
-
- static {
- _NAME_TYPE.put(GLOBAL.name, GLOBAL);
- _NAME_TYPE.put(GROUPBY.name, GROUPBY);
- _NAME_TYPE.put(SHUFFLE.name, SHUFFLE);
-
- _INDEX_TYPE.put(GLOBAL.index, GLOBAL);
- _INDEX_TYPE.put(GROUPBY.index, GLOBAL);
- _INDEX_TYPE.put(SHUFFLE.index, GLOBAL);
- }
- }
-
- public List<String> getColumns() {
- return columns;
- }
-
- public void setColumns(List<String> columns) {
- this.columns = columns;
- }
-
- public String getStreamId() {
- return streamId;
- }
-
- public void setStreamId(String streamId) {
- this.streamId = streamId;
- }
-
- public StreamSortSpec getSortSpec() {
- return sortSpec;
- }
-
- public void setSortSpec(StreamSortSpec sortSpec) {
- this.sortSpec = sortSpec;
- }
-
- @Override
- public String toString() {
- return String.format("StreamPartition[streamId=%s,type=%s,columns=[%s],sortSpec=[%s]]", this.getStreamId(), this.getType(), StringUtils.join(this.getColumns(), ","), sortSpec);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
deleted file mode 100644
index ff05fc8..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.joda.time.Period;
-
-import java.io.Serializable;
-
-/**
- * streamId is the key.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class StreamSortSpec implements Serializable {
- private static final long serialVersionUID = 3626506441441584937L;
- private String windowPeriod = "";
- private int windowMargin = 30 * 1000; // 30 seconds by default
-
- public StreamSortSpec() {
- }
-
- public StreamSortSpec(StreamSortSpec spec) {
- this.windowPeriod = spec.windowPeriod;
- this.windowMargin = spec.windowMargin;
- }
-
- public String getWindowPeriod() {
- return windowPeriod;
- }
-
- public int getWindowPeriodMillis() {
- if (StringUtils.isNotBlank(windowPeriod)) {
- return TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(windowPeriod));
- } else {
- return 0;
- }
- }
-
- public void setWindowPeriod(String windowPeriod) {
- this.windowPeriod = windowPeriod;
- }
-
- public void setWindowPeriodMillis(int windowPeriodMillis) {
- this.windowPeriod = Period.millis(windowPeriodMillis).toString();
- }
-
- public void setWindowPeriod2(Period period) {
- this.windowPeriod = period.toString();
- }
-
-
- public int getWindowMargin() {
- return windowMargin;
- }
-
- public void setWindowMargin(int windowMargin) {
- this.windowMargin = windowMargin;
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder()
- .append(windowPeriod)
- .append(windowMargin)
- .toHashCode();
- }
-
- @Override
- public boolean equals(Object that) {
- if (this == that) {
- return true;
- }
- if (!(that instanceof StreamSortSpec)) {
- return false;
- }
-
- StreamSortSpec another = (StreamSortSpec) that;
- return
- another.windowPeriod.equals(this.windowPeriod)
- && another.windowMargin == this.windowMargin;
- }
-
- @Override
- public String toString() {
- return String.format("StreamSortSpec[windowPeriod=%s,windowMargin=%d]",
- this.getWindowPeriod(),
- this.getWindowMargin());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
deleted file mode 100644
index 1e40309..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import java.util.Map;
-
-/**
- * @since Apr 5, 2016.
- */
-public class StreamingCluster {
- public static enum StreamingType {
- STORM
- }
-
- @JsonProperty
- private String name;
- @JsonProperty
- private String zone;
- @JsonProperty
- private StreamingType type;
- @JsonProperty
- private String description;
- /**
- * key - nimbus for storm.
- */
- @JsonProperty
- private Map<String, String> deployments;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getZone() {
- return zone;
- }
-
- public void setZone(String zone) {
- this.zone = zone;
- }
-
- public StreamingType getType() {
- return type;
- }
-
- public void setType(StreamingType type) {
- this.type = type;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-
- public Map<String, String> getDeployments() {
- return deployments;
- }
-
- public void setDeployments(Map<String, String> deployments) {
- this.deployments = deployments;
- }
-
- public static final String NIMBUS_HOST = "nimbusHost";
- public static final String NIMBUS_THRIFT_PORT = "nimbusThriftPort";
-
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
deleted file mode 100644
index a794e49..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.model;
-
-import com.google.common.base.Preconditions;
-import org.apache.eagle.common.DateTimeUtil;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Use as final rich alert event.
- */
-public class AlertPublishEvent {
- private String alertId;
- private String siteId;
- private List<String> appIds;
- private String policyId;
- private String policyValue;
- private long alertTimestamp;
- private Map<String, Object> alertData;
- private String alertSubject;
- private String alertBody;
- private String streamId;
- private String createdBy;
- private long createdTime;
-
- public static final String ALERT_ID_KEY = "alertId";
- public static final String SITE_ID_KEY = "siteId";
- public static final String APP_IDS_KEY = "appIds";
- public static final String POLICY_ID_KEY = "policyId";
- public static final String POLICY_VALUE_KEY = "policyValue";
- public static final String ALERT_CATEGORY = "category";
- public static final String ALERT_SEVERITY = "severity";
-
- public String getAlertId() {
- return alertId;
- }
-
- public void setAlertId(String alertId) {
- this.alertId = alertId;
- }
-
- public List<String> getAppIds() {
- return appIds;
- }
-
- public void setAppIds(List<String> appIds) {
- this.appIds = appIds;
- }
-
- public String getPolicyValue() {
- return policyValue;
- }
-
- public void setPolicyValue(String policyValue) {
- this.policyValue = policyValue;
- }
-
- public long getAlertTimestamp() {
- return alertTimestamp;
- }
-
- public void setAlertTimestamp(long alertTimestamp) {
- this.alertTimestamp = alertTimestamp;
- }
-
- public String getSiteId() {
- return siteId;
- }
-
- public void setSiteId(String siteId) {
- this.siteId = siteId;
- }
-
-
- public String getPolicyId() {
- return policyId;
- }
-
- public void setPolicyId(String policyId) {
- this.policyId = policyId;
- }
-
- public Map<String, Object> getAlertData() {
- return alertData;
- }
-
- public void setAlertData(Map<String, Object> alertData) {
- this.alertData = alertData;
- }
-
- public static AlertPublishEvent createAlertPublishEvent(AlertStreamEvent event) {
- Preconditions.checkNotNull(event.getAlertId(), "alertId is not initialized before being published: " + event.toString());
- AlertPublishEvent alertEvent = new AlertPublishEvent();
- alertEvent.setAlertId(event.getAlertId());
- alertEvent.setPolicyId(event.getPolicyId());
- alertEvent.setAlertTimestamp(event.getCreatedTime());
- alertEvent.setStreamId(event.getStreamId());
- alertEvent.setCreatedBy(event.getCreatedBy());
- alertEvent.setCreatedTime(event.getCreatedTime());
- alertEvent.setAlertSubject(event.getSubject());
- alertEvent.setAlertBody(event.getBody());
- if (event.getContext() != null && !event.getContext().isEmpty()) {
- if (event.getContext().containsKey(SITE_ID_KEY)) {
- alertEvent.setSiteId(event.getContext().get(SITE_ID_KEY).toString());
- }
- if (event.getContext().containsKey(POLICY_VALUE_KEY)) {
- alertEvent.setPolicyValue(event.getContext().get(POLICY_VALUE_KEY).toString());
- }
- if (event.getContext().containsKey(APP_IDS_KEY)) {
- alertEvent.setAppIds((List<String>) event.getContext().get(APP_IDS_KEY));
- }
- }
- alertEvent.setAlertData(event.getDataMap());
- return alertEvent;
- }
-
- public String toString() {
- return String.format("%s %s alertId=%s, siteId=%s, policyId=%s, alertData=%s",
- DateTimeUtil.millisecondsToHumanDateWithSeconds(alertTimestamp),
- DateTimeUtil.CURRENT_TIME_ZONE.getID(),
- alertId,
- siteId,
- policyId,
- alertData.toString());
- }
-
- public String getAlertSubject() {
- return alertSubject;
- }
-
- public void setAlertSubject(String alertSubject) {
- this.alertSubject = alertSubject;
- }
-
- public String getAlertBody() {
- return alertBody;
- }
-
- public void setAlertBody(String alertBody) {
- this.alertBody = alertBody;
- }
-
- public String getStreamId() {
- return streamId;
- }
-
- public void setStreamId(String streamId) {
- this.streamId = streamId;
- }
-
- public String getCreatedBy() {
- return createdBy;
- }
-
- public void setCreatedBy(String createdBy) {
- this.createdBy = createdBy;
- }
-
- public long getCreatedTime() {
- return createdTime;
- }
-
- public void setCreatedTime(long createdTime) {
- this.createdTime = createdTime;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
deleted file mode 100644
index 00170df..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.model;
-
-import org.apache.eagle.alert.engine.coordinator.AlertSeverity;
-import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.*;
-
-/**
- * streamId stands for alert type instead of source event streamId.
- */
-public class AlertStreamEvent extends StreamEvent {
- private static final long serialVersionUID = 2392131134670106397L;
-
- private String siteId;
- private String alertId;
- private String policyId;
- private StreamDefinition schema;
- private String createdBy;
- private long createdTime;
- private String category;
- private AlertSeverity severity = AlertSeverity.WARNING;
-
- // ----------------------
- // Lazy Alert Fields
- // ----------------------
-
- // Dynamical context like app related fields
- private Map<String, Object> context;
- // Alert content like subject and body
- private String subject;
- private String body;
-
- public AlertStreamEvent() {
- }
-
- public AlertStreamEvent(AlertStreamEvent event) {
- this.siteId = event.getSiteId();
- this.alertId = event.getAlertId();
- this.policyId = event.policyId;
- this.schema = event.schema;
- this.createdBy = event.createdBy;
- this.createdTime = event.createdTime;
- this.setTimestamp(event.getTimestamp());
- this.setData(new Object[event.data.length]);
- System.arraycopy(event.data, 0, this.data, 0, event.data.length);
- this.setStreamId(event.getStreamId());
- this.setMetaVersion(event.getMetaVersion());
- }
-
- public void setPolicyId(String policyId) {
- this.policyId = policyId;
- }
-
- public String getPolicyId() {
- return policyId;
- }
-
- @Override
- public String toString() {
- List<String> dataStrings = new ArrayList<>(this.getData().length);
- for (Object obj : this.getData()) {
- if (obj != null) {
- dataStrings.add(obj.toString());
- } else {
- dataStrings.add(null);
- }
- }
-
- return String.format("Alert {site=%s, stream=%s,timestamp=%s,data=%s, policyId=%s, createdBy=%s, metaVersion=%s}",
- this.getSiteId(),
- this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
- this.getDataMap(), this.getPolicyId(), this.getCreatedBy(), this.getMetaVersion());
- }
-
- public String getCreatedBy() {
- return createdBy;
- }
-
- public void setCreatedBy(String createdBy) {
- this.createdBy = createdBy;
- }
-
- public StreamDefinition getSchema() {
- return schema;
- }
-
- public void setSchema(StreamDefinition schema) {
- this.schema = schema;
- }
-
- public long getCreatedTime() {
- return createdTime;
- }
-
- public void setCreatedTime(long createdTime) {
- this.createdTime = createdTime;
- }
-
- public Map<String, Object> getDataMap() {
- Map<String, Object> event = new HashMap<>();
- for (StreamColumn column : schema.getColumns()) {
- Object obj = this.getData()[schema.getColumnIndex(column.getName())];
- if (obj == null) {
- event.put(column.getName(), null);
- continue;
- }
- event.put(column.getName(), obj);
- }
- return event;
- }
-
- public Map<String, Object> getContext() {
- return context;
- }
-
- public void setContext(Map<String, Object> context) {
- this.context = context;
- }
-
- public String getAlertId() {
- ensureAlertId();
- return alertId;
- }
-
- public void ensureAlertId() {
- if (this.alertId == null) {
- this.alertId = UUID.randomUUID().toString();
- }
- }
-
- public String getSubject() {
- return subject;
- }
-
- public void setSubject(String subject) {
- this.subject = subject;
- }
-
- public String getBody() {
- return body;
- }
-
- public void setBody(String body) {
- this.body = body;
- }
-
- public String getCategory() {
- return category;
- }
-
- public void setCategory(String category) {
- this.category = category;
- }
-
- public AlertSeverity getSeverity() {
- return severity;
- }
-
- public void setSeverity(AlertSeverity severity) {
- this.severity = severity;
- }
-
- public String getSiteId() {
- return siteId;
- }
-
- public void setSiteId(String siteId) {
- this.siteId = siteId;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
deleted file mode 100644
index ecca0ff..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.model;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import backtype.storm.tuple.Tuple;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * This is a critical data structure across spout, router bolt and alert bolt
- * partition[StreamPartition] defines how one incoming data stream is partitioned, sorted
- * partitionKey[long] is java hash value of groupby fields. The groupby fields are defined in StreamPartition
- * event[StreamEvent] is actual data.
- */
-public class PartitionedEvent implements Serializable {
- private static final long serialVersionUID = -3840016190614238593L;
- private StreamPartition partition;
- private long partitionKey;
- private StreamEvent event;
-
- /**
- * Used for bolt-internal but not inter-bolts,
- * will not pass across bolts.
- */
- private transient Tuple anchor;
-
- public PartitionedEvent() {
- this.event = null;
- this.partition = null;
- this.partitionKey = 0L;
- }
-
- public PartitionedEvent(StreamEvent event, StreamPartition partition, int partitionKey) {
- this.event = event;
- this.partition = partition;
- this.partitionKey = partitionKey;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (obj instanceof PartitionedEvent) {
- PartitionedEvent another = (PartitionedEvent) obj;
- return !(this.partitionKey != another.getPartitionKey()
- || !Objects.equals(this.event, another.getEvent())
- || !Objects.equals(this.partition, another.getPartition())
- || !Objects.equals(this.anchor, another.anchor));
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder()
- .append(partitionKey)
- .append(event)
- .append(partition)
- .append(anchor)
- .build();
- }
-
- public StreamEvent getEvent() {
- return event;
- }
-
- public void setEvent(StreamEvent event) {
- this.event = event;
- }
-
- public StreamPartition getPartition() {
- return partition;
- }
-
- public void setPartition(StreamPartition partition) {
- this.partition = partition;
- }
-
- public void setPartitionKey(long partitionKey) {
- this.partitionKey = partitionKey;
- }
-
- public long getPartitionKey() {
- return this.partitionKey;
- }
-
- public String toString() {
- return String.format("PartitionedEvent[partition=%s,event=%s,key=%s", partition, event, partitionKey);
- }
-
- public long getTimestamp() {
- return (event != null) ? event.getTimestamp() : 0L;
- }
-
- public String getStreamId() {
- return (event != null) ? event.getStreamId() : null;
- }
-
- public Object[] getData() {
- return event != null ? event.getData() : null;
- }
-
- public boolean isSortRequired() {
- return isPartitionRequired() && this.getPartition().getSortSpec() != null;
- }
-
- public boolean isPartitionRequired() {
- return this.getPartition() != null;
- }
-
- public PartitionedEvent copy() {
- PartitionedEvent copied = new PartitionedEvent();
- copied.setEvent(this.getEvent());
- copied.setPartition(this.partition);
- copied.setPartitionKey(this.partitionKey);
- return copied;
- }
-
- public Tuple getAnchor() {
- return anchor;
- }
-
- public void setAnchor(Tuple anchor) {
- this.anchor = anchor;
- }
-
- public PartitionedEvent withAnchor(Tuple tuple) {
- this.setAnchor(tuple);
- return this;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
deleted file mode 100644
index 130985f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.model;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.common.DateTimeUtil;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * @since Apr 5, 2016.
- */
-public class StreamEvent implements Serializable {
- private static final long serialVersionUID = 2765116509856609763L;
-
- protected String streamId;
- protected Object[] data;
- protected long timestamp;
- protected String metaVersion;
-
- public StreamEvent() {
- }
-
- public StreamEvent(String streamId, long timestamp, Object[] data) {
- this.setStreamId(streamId);
- this.setTimestamp(timestamp);
- this.setData(data);
- }
-
- public StreamEvent(String streamId, long timestamp, Object[] data, String metaVersion) {
- this.setStreamId(streamId);
- this.setTimestamp(timestamp);
- this.setData(data);
- this.setMetaVersion(metaVersion);
- }
-
- public String getStreamId() {
- return streamId;
- }
-
- public void setStreamId(String streamId) {
- this.streamId = streamId;
- }
-
-
- public void setData(Object[] data) {
- this.data = data;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- public String getMetaVersion() {
- return metaVersion;
- }
-
- public void setMetaVersion(String metaVersion) {
- this.metaVersion = metaVersion;
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(streamId).append(timestamp).append(data).append(metaVersion).build();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj instanceof StreamEvent) {
- StreamEvent another = (StreamEvent) obj;
- return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp
- && Arrays.deepEquals(this.data, another.data) && Objects.equals(this.metaVersion, another.metaVersion);
- }
- return false;
- }
-
- @Override
- public String toString() {
- List<String> dataStrings = new ArrayList<>();
- if (this.getData() != null) {
- for (Object obj : this.getData()) {
- if (obj != null) {
- dataStrings.add(obj.toString());
- } else {
- dataStrings.add(null);
- }
- }
- }
- return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]",
- this.getStreamId(),
- DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
- StringUtils.join(dataStrings, ","),
- this.getMetaVersion());
- }
-
- public static StreamEventBuilder builder() {
- return new StreamEventBuilder();
- }
-
- /**
- * @return cloned new event object.
- */
- public StreamEvent copy() {
- StreamEvent newEvent = new StreamEvent();
- newEvent.setTimestamp(this.getTimestamp());
- newEvent.setData(this.getData());
- newEvent.setStreamId(this.getStreamId());
- newEvent.setMetaVersion(this.getMetaVersion());
- return newEvent;
- }
-
- public void copyFrom(StreamEvent event) {
- this.setTimestamp(event.getTimestamp());
- this.setData(event.getData());
- this.setStreamId(event.getStreamId());
- this.setMetaVersion(event.getMetaVersion());
- }
-
- public Object[] getData() {
- return data;
- }
-
- public Object[] getData(StreamDefinition streamDefinition, List<String> column) {
- ArrayList<Object> result = new ArrayList<>(column.size());
- for (String colName : column) {
- result.add(this.getData()[streamDefinition.getColumnIndex(colName)]);
- }
- return result.toArray();
- }
-
- public Object[] getData(StreamDefinition streamDefinition, String... column) {
- ArrayList<Object> result = new ArrayList<>(column.length);
- for (String colName : column) {
- result.add(this.getData()[streamDefinition.getColumnIndex(colName)]);
- }
- return result.toArray();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
deleted file mode 100644
index 53101ef..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.model;
-
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class StreamEventBuilder {
- private static final Logger LOG = LoggerFactory.getLogger(StreamEventBuilder.class);
-
- private StreamEvent instance;
- private StreamDefinition streamDefinition;
-
- public StreamEventBuilder() {
- instance = new StreamEvent();
- }
-
- public StreamEventBuilder schema(StreamDefinition streamDefinition) {
- this.streamDefinition = streamDefinition;
- if (instance.getStreamId() == null) {
- instance.setStreamId(streamDefinition.getStreamId());
- }
- return this;
- }
-
- public StreamEventBuilder streamId(String streamId) {
- instance.setStreamId(streamId);
- return this;
- }
-
- public StreamEventBuilder attributes(Map<String, Object> data, StreamDefinition streamDefinition) {
- this.schema(streamDefinition);
- List<StreamColumn> columnList = streamDefinition.getColumns();
- if (columnList != null && columnList.size() > 0) {
- List<Object> values = new ArrayList<>(columnList.size());
- for (StreamColumn column : columnList) {
- values.add(data.getOrDefault(column.getName(), column.getDefaultValue()));
- }
- instance.setData(values.toArray());
- } else if (LOG.isDebugEnabled()) {
- LOG.warn("All data [{}] are ignored as no columns defined in schema {}", data, streamDefinition);
- }
- return this;
- }
-
- public StreamEventBuilder attributes(Map<String, Object> data) {
- return attributes(data, this.streamDefinition);
- }
-
- public StreamEventBuilder attributes(Object... data) {
- instance.setData(data);
- return this;
- }
-
- public StreamEventBuilder timestamep(long timestamp) {
- instance.setTimestamp(timestamp);
- return this;
- }
-
- public StreamEventBuilder metaVersion(String metaVersion) {
- instance.setMetaVersion(metaVersion);
- return this;
- }
-
- public StreamEvent build() {
- if (instance.getStreamId() == null) {
- throw new IllegalArgumentException("streamId is null of event: " + instance);
- }
- return instance;
- }
-
- public StreamEventBuilder copyFrom(StreamEvent event) {
- this.instance.copyFrom(event);
- return this;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
deleted file mode 100644
index 461a23c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metric;
-
-import org.apache.eagle.alert.metric.sink.MetricSink;
-import org.apache.eagle.alert.metric.source.MetricSource;
-
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
-import java.util.Map;
-
-public interface IMetricSystem {
-
- /**
- * Initialize.
- */
- void start();
-
- /**
- * Schedule reporter.
- */
- void schedule();
-
- /**
- * Close and stop all resources and services.
- */
- void stop();
-
- /**
- * Manual report metric.
- */
- void report();
-
- /**
- * @param sink metric sink.
- */
- void register(MetricSink sink, Config config);
-
- /**
- * @param source metric source.
- */
- void register(MetricSource source);
-
- void tags(Map<String, Object> metricTags);
-
- MetricRegistry registry();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java
deleted file mode 100644
index 81aa75c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.metric;
-
-public class MetricConfigs {
- public static final String METRIC_PREFIX_CONF = "metric.prefix";
- public static final String METRIC_SINK_CONF = "metric.sink";
- public static final String DURATION_SECONDS_CONF = "metric.durationSeconds";
- public static final String TAGS_FIELD_NAME = "tags";
-}
\ No newline at end of file