You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/08 07:14:18 UTC
[17/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle
problems on eagle-alert module and enable failOnViolation
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
index 8867fd7..4627bef 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
@@ -16,49 +16,46 @@
*/
package org.apache.eagle.alert.coordination.model;
+import com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Strings;
-
/**
* Convert incoming tuple to stream
* incoming tuple consists of 2 fields, topic and map of key/value
- * output stream consists of 3 fields, stream name, timestamp, and map of key/value
+ * output stream consists of 3 fields, stream name, timestamp, and map of key/value.
*/
public class Tuple2StreamConverter {
private static final Logger LOG = LoggerFactory.getLogger(Tuple2StreamConverter.class);
private Tuple2StreamMetadata metadata;
private StreamNameSelector cachedSelector;
- public Tuple2StreamConverter(Tuple2StreamMetadata metadata){
+
+ public Tuple2StreamConverter(Tuple2StreamMetadata metadata) {
this.metadata = metadata;
try {
- cachedSelector = (StreamNameSelector)Class.forName(metadata.getStreamNameSelectorCls()).
- getConstructor(Properties.class).
- newInstance(metadata.getStreamNameSelectorProp());
- }catch(Exception ex){
+ cachedSelector = (StreamNameSelector) Class.forName(metadata.getStreamNameSelectorCls())
+ .getConstructor(Properties.class)
+ .newInstance(metadata.getStreamNameSelectorProp());
+ } catch (Exception ex) {
LOG.error("error initializing StreamNameSelector object", ex);
throw new IllegalStateException(ex);
}
}
/**
- * Assume tuple is composed of topic + map of key/value
- * @param tuple
- * @return
+ * Assume tuple is composed of topic + map of key/value.
*/
- @SuppressWarnings({ "unchecked" })
- public List<Object> convert(List<Object> tuple){
- Map<String, Object> m = (Map<String, Object>)tuple.get(1);
+ @SuppressWarnings( {"unchecked"})
+ public List<Object> convert(List<Object> tuple) {
+ Map<String, Object> m = (Map<String, Object>) tuple.get(1);
String streamName = cachedSelector.getStreamName(m);
- if(!metadata.getActiveStreamNames().contains(streamName)) {
- if(LOG.isDebugEnabled()) {
+ if (!metadata.getActiveStreamNames().contains(streamName)) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("streamName {} is not within activeStreamNames {}", streamName, metadata.getActiveStreamNames());
}
return null;
@@ -81,14 +78,15 @@ public class Tuple2StreamConverter {
LOG.debug("continue with current timestamp becuase no data format sepcified! Metadata : {} ", metadata);
}
timestamp = System.currentTimeMillis();
- } else
-
- try {
- SimpleDateFormat sdf = new SimpleDateFormat(metadata.getTimestampFormat());
- timestamp = sdf.parse(timestampFieldValue).getTime();
- } catch (Exception ex) {
- LOG.error("continue with current timestamp because error happens while parsing timestamp column " + metadata.getTimestampColumn() + " with format " + metadata.getTimestampFormat());
- timestamp = System.currentTimeMillis();
+ } else {
+ try {
+ SimpleDateFormat sdf = new SimpleDateFormat(metadata.getTimestampFormat());
+ timestamp = sdf.parse(timestampFieldValue).getTime();
+ } catch (Exception ex) {
+ LOG.error("continue with current timestamp because error happens while parsing timestamp column "
+ + metadata.getTimestampColumn() + " with format " + metadata.getTimestampFormat());
+ timestamp = System.currentTimeMillis();
+ }
}
}
return Arrays.asList(tuple.get(0), streamName, timestamp, tuple.get(1));
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
index bde4fe3..788547d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
@@ -21,21 +21,23 @@ import java.util.Properties;
import java.util.Set;
/**
- * @Since 4/25/16. This metadata controls how tuple is transformed to stream for
- * example raw data consists of {"metric" : "cpuUsage", "host" :
- * "xyz.com", "timestamp" : 1346846400, "value" : "0.9"} field "metric"
- * is used for creating stream name, here "cpuUsage" is stream name
+ * This metadata controls how tuple is transformed to stream for
+ * example raw data consists of {"metric" : "cpuUsage", "host" :
+ * "xyz.com", "timestamp" : 1346846400, "value" : "0.9"} field "metric"
+ * is used for creating stream name, here "cpuUsage" is stream name
*
- * metric could be "cpuUsage", "diskUsage", "memUsage" etc, so
- * activeStreamNames are subset of all metric names
+ * <p>metric could be "cpuUsage", "diskUsage", "memUsage" etc, so
+ * activeStreamNames are subset of all metric names</p>
*
- * All other messages which are not one of activeStreamNames will be
- * filtered out
+ * <p>All other messages which are not one of activeStreamNames will be
+ * filtered out.</p>
+ *
+ * @since 4/25/16
*/
public class Tuple2StreamMetadata {
/**
* only messages belonging to activeStreamNames will be kept while
- * transforming tuple into stream
+ * transforming tuple into stream.
*/
private Set<String> activeStreamNames = new HashSet<String>();
// the specific stream name selector
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
index f4b8ccb..bbd4178 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
@@ -18,10 +18,6 @@ package org.apache.eagle.alert.coordination.model;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-/**
- * @since May 25, 2016
- *
- */
public class VersionedPolicyDefinition {
private String version;
private PolicyDefinition definition;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
index 2770aa1..c9f830b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
@@ -18,10 +18,6 @@ package org.apache.eagle.alert.coordination.model;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-/**
- * @since May 25, 2016
- *
- */
public class VersionedStreamDefinition {
private String version;
private StreamDefinition definition;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
index 3f6f36d..9353dbd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
@@ -63,8 +63,8 @@ public class WorkSlot {
return false;
}
WorkSlot workSlot = (WorkSlot) other;
- return Objects.equals(topologyName, workSlot.topologyName) &&
- Objects.equals(boltId, workSlot.boltId);
+ return Objects.equals(topologyName, workSlot.topologyName)
+ && Objects.equals(boltId, workSlot.boltId);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
index beda896..e72836e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
@@ -16,22 +16,19 @@
*/
package org.apache.eagle.alert.coordination.model.internal;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
/**
* A monitored stream is the unique data set in the system.
- *
- * It's a combination of stream and the specific grp-by on it.
- *
- * For correlation stream, it means multiple stream for a given monitored stream.
- *
- *
- * @since Apr 27, 2016
*
+ * <p>It's a combination of stream and the specific grp-by on it.
+ *
+ * <p>For correlation stream, it means multiple stream for a given monitored stream.
+ *
+ * @since Apr 27, 2016
*/
public class MonitoredStream {
@@ -40,7 +37,7 @@ public class MonitoredStream {
// the stream group that this monitored stream stands for
private StreamGroup streamGroup = new StreamGroup();
private List<StreamWorkSlotQueue> queues = new ArrayList<StreamWorkSlotQueue>();
-
+
public MonitoredStream() {
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
index 3e956ca..7747d58 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
@@ -17,10 +17,9 @@
package org.apache.eagle.alert.coordination.model.internal;
/**
- * monitor metadata
- *
- * @since Apr 27, 2016
+ * monitor metadata.
*
+ * @since Apr 27, 2016
*/
public class PolicyAssignment {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
index a1efbf9..2462119 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
@@ -17,12 +17,9 @@
package org.apache.eagle.alert.coordination.model.internal;
/**
- *
- * This is the Base part of ScheduleState, only contains version/generateTime/code/message/scheduleTimeMillis
- *
+ * This is the Base part of ScheduleState, only contains version/generateTime/code/message/scheduleTimeMillis.
*
* @since Aug 10, 2016
- *
*/
public class ScheduleStateBase {
private String version;
@@ -81,6 +78,4 @@ public class ScheduleStateBase {
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
index d87d62b..7941b85 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
@@ -16,26 +16,23 @@
*/
package org.apache.eagle.alert.coordination.model.internal;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Objects;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
-/**
- * @since May 6, 2016
- *
- */
public class StreamGroup {
private List<StreamPartition> streamPartitions = new ArrayList<StreamPartition>();
-
+
public StreamGroup() {
}
-
+
public List<StreamPartition> getStreamPartitions() {
return streamPartitions;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
index f4f6142..86b150c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
@@ -16,22 +16,20 @@
*/
package org.apache.eagle.alert.coordination.model.internal;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
/**
* A work queue for given one monitored stream.
- *
- * Analog to storm's "tasks for given bolt".
- *
- * @since Apr 27, 2016
*
+ * <p>Analog to storm's "tasks for given bolt".
+ *
+ * @since Apr 27, 2016
*/
public class StreamWorkSlotQueue {
private String queueId;
@@ -40,15 +38,15 @@ public class StreamWorkSlotQueue {
private boolean dedicated;
// some dedicated option, like dedicated userId/tenantId/policyId.
private Map<String, Object> dedicateOption;
-
+
private int numberOfGroupBolts;
- private Map<String, Integer> topoGroupStartIndex = new HashMap<String, Integer>();
+ private Map<String, Integer> topoGroupStartIndex = new HashMap<String, Integer>();
public StreamWorkSlotQueue() {
}
-
+
public StreamWorkSlotQueue(StreamGroup par, boolean isDedicated, Map<String, Object> options,
- List<WorkSlot> slots) {
+ List<WorkSlot> slots) {
this.queueId = par.getStreamId() + System.currentTimeMillis();// simply generate a queue
this.dedicated = isDedicated;
dedicateOption = new HashMap<String, Object>();
@@ -81,11 +79,11 @@ public class StreamWorkSlotQueue {
return workingSlots.size();
}
-// @org.codehaus.jackson.annotate.JsonIgnore
-// @JsonIgnore
-// public void placePolicy(PolicyDefinition pd) {
-// policies.add(pd.getName());
-// }
+ // @org.codehaus.jackson.annotate.JsonIgnore
+ // @JsonIgnore
+ // public void placePolicy(PolicyDefinition pd) {
+ // policies.add(pd.getName());
+ // }
public int getNumberOfGroupBolts() {
return numberOfGroupBolts;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
index 189e2a5..c41c867 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
@@ -20,10 +20,12 @@ import java.util.HashSet;
import java.util.Set;
/**
- * @since Mar 24, 2016 Logically one unit topology consists of S spouts, G
- * groupby bolts, A alertBolts normally S=1 Physically each spout is
- * composed of s spout nodes, each groupby bolt is composed of g groupby
- * nodes, and each alert bolt is composed of a alert nodes
+ * Logically one unit topology consists of S spouts, G
+ * groupby bolts, A alertBolts normally S=1 Physically each spout is
+ * composed of s spout nodes, each groupby bolt is composed of g groupby
+ * nodes, and each alert bolt is composed of a alert nodes.
+ *
+ * @since Mar 24, 2016
*/
public class Topology {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java
index dd26d20..ac375e1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java
@@ -19,8 +19,7 @@ package org.apache.eagle.alert.engine.codec;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
/**
- * @since Jun 3, 2016
- *
+ * @since Jun 3, 2016.
*/
public interface IEventSerializer {
Object serialize(AlertStreamEvent event);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index c54955f..680b21a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -25,11 +25,10 @@ import java.io.Serializable;
import java.util.*;
/**
- * @since Apr 5, 2016
- *
+ * @since Apr 5, 2016.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
-public class PolicyDefinition implements Serializable{
+public class PolicyDefinition implements Serializable {
private static final long serialVersionUID = 377581499339572414L;
// unique identifier
private String name;
@@ -116,48 +115,50 @@ public class PolicyDefinition implements Serializable{
}
public PolicyStatus getPolicyStatus() {
- return policyStatus;
- }
+ return policyStatus;
+ }
- public void setPolicyStatus(PolicyStatus policyStatus) {
- this.policyStatus = policyStatus;
- }
+ public void setPolicyStatus(PolicyStatus policyStatus) {
+ this.policyStatus = policyStatus;
+ }
- @Override
+ @Override
public int hashCode() {
- return new HashCodeBuilder().
- append(name).
- append(inputStreams).
- append(outputStreams).
- append(definition).
- append(partitionSpec).
-// append(parallelismHint).
- build();
+ return new HashCodeBuilder()
+ .append(name)
+ .append(inputStreams)
+ .append(outputStreams)
+ .append(definition)
+ .append(partitionSpec)
+ // .append(parallelismHint)
+ .build();
}
@Override
- public boolean equals(Object that){
- if(that == this)
+ public boolean equals(Object that) {
+ if (that == this) {
return true;
- if(! (that instanceof PolicyDefinition))
+ }
+ if (!(that instanceof PolicyDefinition)) {
return false;
- PolicyDefinition another = (PolicyDefinition)that;
- if(Objects.equals(another.name, this.name) &&
- Objects.equals(another.description, this.description) &&
- CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams) &&
- CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams) &&
- another.definition.equals(this.definition) &&
- Objects.equals(this.definition, another.definition) &&
- CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
-// && another.parallelismHint == this.parallelismHint
- ) {
+ }
+ PolicyDefinition another = (PolicyDefinition) that;
+ if (Objects.equals(another.name, this.name)
+ && Objects.equals(another.description, this.description)
+ && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
+ && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
+ && another.definition.equals(this.definition)
+ && Objects.equals(this.definition, another.definition)
+ && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
+ // && another.parallelismHint == this.parallelismHint
+ ) {
return true;
}
return false;
}
@JsonIgnoreProperties(ignoreUnknown = true)
- public static class Definition implements Serializable{
+ public static class Definition implements Serializable {
private static final long serialVersionUID = -622366527887848346L;
public String type;
@@ -168,7 +169,7 @@ public class PolicyDefinition implements Serializable{
private List<String> inputStreams = new ArrayList<String>();
private List<String> outputStreams = new ArrayList<String>();
- public Definition(String type,String value){
+ public Definition(String type, String value) {
this.type = type;
this.value = value;
}
@@ -184,17 +185,20 @@ public class PolicyDefinition implements Serializable{
}
@Override
- public boolean equals(Object that){
- if(that == this)
+ public boolean equals(Object that) {
+ if (that == this) {
return true;
- if(!(that instanceof Definition))
+ }
+ if (!(that instanceof Definition)) {
return false;
- Definition another = (Definition)that;
- if(another.type.equals(this.type)
- && another.value.equals(this.value)
- && ListUtils.isEqualList(another.inputStreams, this.inputStreams)
- && ListUtils.isEqualList(another.outputStreams, this.outputStreams))
+ }
+ Definition another = (Definition) that;
+ if (another.type.equals(this.type)
+ && another.value.equals(this.value)
+ && ListUtils.isEqualList(another.inputStreams, this.inputStreams)
+ && ListUtils.isEqualList(another.outputStreams, this.outputStreams)) {
return true;
+ }
return false;
}
@@ -248,16 +252,16 @@ public class PolicyDefinition implements Serializable{
@Override
public String toString() {
- return String.format("{type=\"%s\",value=\"%s\", inputStreams=\"%s\", outputStreams=\"%s\" }",type,value, inputStreams, outputStreams);
+ return String.format("{type=\"%s\",value=\"%s\", inputStreams=\"%s\", outputStreams=\"%s\" }", type, value, inputStreams, outputStreams);
}
}
-
+
public static enum PolicyStatus {
- ENABLED, DISABLED
+ ENABLED, DISABLED
}
@Override
public String toString() {
- return String.format("{name=\"%s\",definition=%s}",this.getName(),this.getDefinition()==null?"null": this.getDefinition().toString());
+ return String.format("{name=\"%s\",definition=%s}", this.getName(), this.getDefinition() == null ? "null" : this.getDefinition().toString());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index e3b4e33..0bada4e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -24,8 +24,7 @@ import java.util.Map;
import java.util.Objects;
/**
- * @since Apr 11, 2016
- *
+ * @since Apr 11, 2016.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Publishment {
@@ -100,8 +99,8 @@ public class Publishment {
if (obj instanceof Publishment) {
Publishment p = (Publishment) obj;
return (Objects.equals(name, p.getName()) && Objects.equals(type, p.getType())
- && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
- && Objects.equals(policyIds, p.getPolicyIds()) && properties.equals(p.getProperties()));
+ && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
+ && Objects.equals(policyIds, p.getPolicyIds()) && properties.equals(p.getProperties()));
}
return false;
}
@@ -109,14 +108,14 @@ public class Publishment {
@Override
public int hashCode() {
return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(policyIds)
- .append(properties).build();
+ .append(properties).build();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:")
- .append(policyIds).append(",properties:").append(properties);
+ .append(policyIds).append(",properties:").append(properties);
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
index daecab4..5329dfa 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
@@ -18,11 +18,11 @@
package org.apache.eagle.alert.engine.coordinator;
-import java.util.Objects;
-
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.commons.lang3.builder.HashCodeBuilder;
+import java.util.Objects;
+
@JsonIgnoreProperties(ignoreUnknown = true)
public class PublishmentType {
@@ -39,17 +39,19 @@ public class PublishmentType {
this.type = type;
}
- public String getClassName(){
+ public String getClassName() {
return className;
}
- public void setClassName(String className){
+
+ public void setClassName(String className) {
this.className = className;
}
- public String getDescription(){
+ public String getDescription() {
return description;
}
- public void setDescription(String description){
+
+ public void setDescription(String description) {
this.description = description;
}
@@ -65,10 +67,10 @@ public class PublishmentType {
public boolean equals(Object obj) {
if (obj instanceof PublishmentType) {
PublishmentType p = (PublishmentType) obj;
- return (Objects.equals(className, p.getClassName()) &&
- Objects.equals(type, p.type) &&
- Objects.equals(description, p.getDescription()) &&
- Objects.equals(fields, p.getFields()));
+ return (Objects.equals(className, p.getClassName())
+ && Objects.equals(type, p.type)
+ && Objects.equals(description, p.getDescription())
+ && Objects.equals(fields, p.getFields()));
}
return false;
}
@@ -76,10 +78,10 @@ public class PublishmentType {
@Override
public int hashCode() {
return new HashCodeBuilder()
- .append(className)
- .append(type)
- .append(description)
- .append(fields)
- .build();
+ .append(className)
+ .append(type)
+ .append(description)
+ .append(fields)
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index 4483fe4..2be4936 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -16,16 +16,14 @@
*/
package org.apache.eagle.alert.engine.coordinator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
-
import javax.xml.bind.annotation.adapters.XmlAdapter;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-
public class StreamColumn implements Serializable {
private static final long serialVersionUID = -5457861313624389106L;
private String name;
@@ -36,19 +34,19 @@ public class StreamColumn implements Serializable {
private String nodataExpression;
public String toString() {
- return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]",
- name, type, defaultValue, required, nodataExpression);
+ return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]",
+ name, type, defaultValue, required, nodataExpression);
}
public String getNodataExpression() {
- return nodataExpression;
- }
+ return nodataExpression;
+ }
- public void setNodataExpression(String nodataExpression) {
- this.nodataExpression = nodataExpression;
- }
+ public void setNodataExpression(String nodataExpression) {
+ this.nodataExpression = nodataExpression;
+ }
- public String getName() {
+ public String getName() {
return name;
}
@@ -71,7 +69,7 @@ public class StreamColumn implements Serializable {
}
private void ensureDefaultValueType() {
- if(this.getDefaultValue()!=null && (this.getDefaultValue() instanceof String) && this.getType() != Type.STRING){
+ if (this.getDefaultValue() != null && (this.getDefaultValue() instanceof String) && this.getType() != Type.STRING) {
switch (this.getType()) {
case INT:
this.setDefaultValue(Integer.valueOf((String) this.getDefaultValue()));
@@ -90,11 +88,13 @@ public class StreamColumn implements Serializable {
break;
case OBJECT:
try {
- this.setDefaultValue(new ObjectMapper().readValue((String) this.getDefaultValue(),HashMap.class));
+ this.setDefaultValue(new ObjectMapper().readValue((String) this.getDefaultValue(), HashMap.class));
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
break;
+ default:
+ throw new IllegalArgumentException("Illegal type: " + this.getType());
}
}
}
@@ -145,7 +145,7 @@ public class StreamColumn implements Serializable {
}
}
- public static class StreamColumnTypeAdapter extends XmlAdapter<String,Type>{
+ public static class StreamColumnTypeAdapter extends XmlAdapter<String, Type> {
@Override
public Type unmarshal(String v) throws Exception {
@@ -158,7 +158,7 @@ public class StreamColumn implements Serializable {
}
}
- public static class DefaultValueAdapter extends XmlAdapter<String,Object>{
+ public static class DefaultValueAdapter extends XmlAdapter<String, Object> {
@Override
public Object unmarshal(String v) throws Exception {
return v;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
index beb8491..9130951 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
@@ -16,18 +16,18 @@
*/
package org.apache.eagle.alert.engine.coordinator;
-import javax.xml.bind.annotation.*;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementWrapper;
/**
* This is actually a data source schema.
- *
- * @since Apr 5, 2016
*
+ * @since Apr 5, 2016
*/
-public class StreamDefinition implements Serializable{
+public class StreamDefinition implements Serializable {
private static final long serialVersionUID = 2352202882328931825L;
private String streamId;
private String dataSource;
@@ -37,14 +37,14 @@ public class StreamDefinition implements Serializable{
private List<StreamColumn> columns = new ArrayList<>();
- public String toString(){
+ public String toString() {
return String.format("StreamDefinition[streamId=%s, dataSource=%s, description=%s, validate=%s, timeseries=%s, columns=%s",
- streamId,
- dataSource,
- description,
- validate,
- timeseries,
- columns);
+ streamId,
+ dataSource,
+ description,
+ validate,
+ timeseries,
+ columns);
}
public String getStreamId() {
@@ -79,7 +79,7 @@ public class StreamDefinition implements Serializable{
this.timeseries = timeseries;
}
- @XmlElementWrapper(name="columns")
+ @XmlElementWrapper(name = "columns")
@XmlElement(name = "column")
public List<StreamColumn> getColumns() {
return columns;
@@ -97,10 +97,12 @@ public class StreamDefinition implements Serializable{
this.dataSource = dataSource;
}
- public int getColumnIndex(String column){
- int i=0;
- for(StreamColumn col:this.getColumns()){
- if(col.getName().equals(column)) return i;
+ public int getColumnIndex(String column) {
+ int i = 0;
+ for (StreamColumn col : this.getColumns()) {
+ if (col.getName().equals(column)) {
+ return i;
+ }
i++;
}
return -1;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
index 47e15c0..0987463 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
@@ -16,20 +16,19 @@
*/
package org.apache.eagle.alert.engine.coordinator;
-import java.io.Serializable;
-import java.util.*;
-
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.HashCodeBuilder;
+import java.io.Serializable;
+import java.util.*;
/**
* StreamPartition defines how a data stream is partitioned and sorted
* streamId is used for distinguishing different streams which are spawned from the same data source
* type defines how to partition data among slots within one slotqueue
* columns are fields based on which stream is grouped
- * sortSpec defines how data is sorted
+ * sortSpec defines how data is sorted.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class StreamPartition implements Serializable {
@@ -52,14 +51,15 @@ public class StreamPartition implements Serializable {
@Override
public boolean equals(Object other) {
- if (other == this)
+ if (other == this) {
return true;
+ }
if (!(other instanceof StreamPartition)) {
return false;
}
StreamPartition sp = (StreamPartition) other;
return Objects.equals(streamId, sp.streamId) && Objects.equals(type, sp.type)
- && CollectionUtils.isEqualCollection(columns, sp.columns) && Objects.equals(sortSpec, sp.sortSpec);
+ && CollectionUtils.isEqualCollection(columns, sp.columns) && Objects.equals(sortSpec, sp.sortSpec);
}
@Override
@@ -71,46 +71,52 @@ public class StreamPartition implements Serializable {
this.type = type;
}
- public Type getType(){
+ public Type getType() {
return this.type;
}
- public enum Type{
- GLOBAL("GLOBAL",0), GROUPBY("GROUPBY",1), SHUFFLE("SHUFFLE",2);
+ public enum Type {
+ GLOBAL("GLOBAL", 0), GROUPBY("GROUPBY", 1), SHUFFLE("SHUFFLE", 2);
private final String name;
private final int index;
- Type(String name, int index){
+
+ Type(String name, int index) {
this.name = name;
this.index = index;
}
+
@Override
public String toString() {
return this.name;
}
- public static Type locate(String type){
+
+ public static Type locate(String type) {
Type _type = _NAME_TYPE.get(type.toUpperCase());
- if(_type == null)
- throw new IllegalStateException("Illegal type name: "+type);
+ if (_type == null) {
+ throw new IllegalStateException("Illegal type name: " + type);
+ }
return _type;
}
- public static Type locate(int index){
+ public static Type locate(int index) {
Type _type = _INDEX_TYPE.get(index);
- if(_type == null)
- throw new IllegalStateException("Illegal type index: "+index);
+ if (_type == null) {
+ throw new IllegalStateException("Illegal type index: " + index);
+ }
return _type;
}
- private static final Map<String,Type> _NAME_TYPE = new HashMap<>();
- private static final Map<Integer,Type> _INDEX_TYPE = new TreeMap<>();
+ private static final Map<String, Type> _NAME_TYPE = new HashMap<>();
+ private static final Map<Integer, Type> _INDEX_TYPE = new TreeMap<>();
+
static {
- _NAME_TYPE.put(GLOBAL.name,GLOBAL);
- _NAME_TYPE.put(GROUPBY.name,GROUPBY);
- _NAME_TYPE.put(SHUFFLE.name,SHUFFLE);
+ _NAME_TYPE.put(GLOBAL.name, GLOBAL);
+ _NAME_TYPE.put(GROUPBY.name, GROUPBY);
+ _NAME_TYPE.put(SHUFFLE.name, SHUFFLE);
- _INDEX_TYPE.put(GLOBAL.index,GLOBAL);
- _INDEX_TYPE.put(GROUPBY.index,GLOBAL);
- _INDEX_TYPE.put(SHUFFLE.index,GLOBAL);
+ _INDEX_TYPE.put(GLOBAL.index, GLOBAL);
+ _INDEX_TYPE.put(GROUPBY.index, GLOBAL);
+ _INDEX_TYPE.put(SHUFFLE.index, GLOBAL);
}
}
@@ -140,6 +146,6 @@ public class StreamPartition implements Serializable {
@Override
public String toString() {
- return String.format("StreamPartition[streamId=%s,type=%s,columns=[%s],sortSpec=[%s]]",this.getStreamId(),this.getType(), StringUtils.join(this.getColumns(),","), sortSpec);
+ return String.format("StreamPartition[streamId=%s,type=%s,columns=[%s],sortSpec=[%s]]", this.getStreamId(), this.getType(), StringUtils.join(this.getColumns(), ","), sortSpec);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
index 962a8ee..65b9151 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
@@ -16,24 +16,24 @@
*/
package org.apache.eagle.alert.engine.coordinator;
-
+import org.apache.eagle.alert.utils.TimePeriodUtils;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
import org.joda.time.Period;
import java.io.Serializable;
/**
- * streamId is the key
+ * streamId is the key.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
-public class StreamSortSpec implements Serializable{
+public class StreamSortSpec implements Serializable {
private static final long serialVersionUID = 3626506441441584937L;
- private String windowPeriod="";
+ private String windowPeriod = "";
private int windowMargin = 30 * 1000; // 30 seconds by default
- public StreamSortSpec() {}
+ public StreamSortSpec() {
+ }
public StreamSortSpec(StreamSortSpec spec) {
this.windowPeriod = spec.windowPeriod;
@@ -45,14 +45,17 @@ public class StreamSortSpec implements Serializable{
}
public int getWindowPeriodMillis() {
- if(windowPeriod!=null) {
+ if (windowPeriod != null) {
return TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(windowPeriod));
- } else return 0;
+ } else {
+ return 0;
+ }
}
public void setWindowPeriod(String windowPeriod) {
this.windowPeriod = windowPeriod;
}
+
public void setWindowPeriodMillis(int windowPeriodMillis) {
this.windowPeriod = Period.millis(windowPeriodMillis).toString();
}
@@ -71,30 +74,32 @@ public class StreamSortSpec implements Serializable{
}
@Override
- public int hashCode(){
- return new HashCodeBuilder().
- append(windowPeriod).
- append(windowMargin).toHashCode();
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(windowPeriod)
+ .append(windowMargin)
+ .toHashCode();
}
@Override
- public boolean equals(Object that){
- if(this == that)
+ public boolean equals(Object that) {
+ if (this == that) {
return true;
- if(!(that instanceof StreamSortSpec)){
+ }
+ if (!(that instanceof StreamSortSpec)) {
return false;
}
- StreamSortSpec another = (StreamSortSpec)that;
- return
- another.windowPeriod.equals(this.windowPeriod) &&
- another.windowMargin == this.windowMargin;
+ StreamSortSpec another = (StreamSortSpec) that;
+ return
+ another.windowPeriod.equals(this.windowPeriod)
+ && another.windowMargin == this.windowMargin;
}
@Override
- public String toString(){
+ public String toString() {
return String.format("StreamSortSpec[windowPeriod=%s,windowMargin=%d]",
- this.getWindowPeriod(),
- this.getWindowMargin());
+ this.getWindowPeriod(),
+ this.getWindowMargin());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
index 6cafb16..1e40309 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
@@ -16,13 +16,11 @@
*/
package org.apache.eagle.alert.engine.coordinator;
-import java.util.Map;
-
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
/**
- * @since Apr 5, 2016
- *
+ * @since Apr 5, 2016.
*/
public class StreamingCluster {
public static enum StreamingType {
@@ -38,7 +36,7 @@ public class StreamingCluster {
@JsonProperty
private String description;
/**
- * key - nimbus for storm
+ * key - nimbus for storm.
*/
@JsonProperty
private Map<String, String> deployments;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index 13881a1..a503dcf 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -16,15 +16,14 @@
*/
package org.apache.eagle.alert.engine.model;
-import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.utils.DateTimeUtil;
-
+import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
- * streamId stands for alert type instead of source event streamId
+ * streamId stands for alert type instead of source event streamId.
*/
public class AlertStreamEvent extends StreamEvent {
private static final long serialVersionUID = 2392131134670106397L;
@@ -45,15 +44,16 @@ public class AlertStreamEvent extends StreamEvent {
@Override
public String toString() {
List<String> dataStrings = new ArrayList<>(this.getData().length);
- for(Object obj: this.getData()){
- if(obj!=null) {
+ for (Object obj : this.getData()) {
+ if (obj != null) {
dataStrings.add(obj.toString());
- }else{
+ } else {
dataStrings.add(null);
}
}
return String.format("AlertStreamEvent[stream=%S,timestamp=%s,data=[%s], policyId=%s, createdBy=%s, metaVersion=%s]",
- this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), StringUtils.join(dataStrings,","),this.getPolicyId(),this.getCreatedBy(),this.getMetaVersion());
+ this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
+ StringUtils.join(dataStrings, ","), this.getPolicyId(), this.getCreatedBy(), this.getMetaVersion());
}
public String getCreatedBy() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
index cfed3e2..51e4532 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
@@ -16,21 +16,19 @@
*/
package org.apache.eagle.alert.engine.model;
-import java.io.Serializable;
-import java.util.Objects;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
import backtype.storm.tuple.Tuple;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import java.io.Serializable;
+import java.util.Objects;
/**
* This is a critical data structure across spout, router bolt and alert bolt
* partition[StreamPartition] defines how one incoming data stream is partitioned, sorted
* partitionKey[long] is java hash value of groupby fields. The groupby fields are defined in StreamPartition
- * event[StreamEvent] is actual data
+ * event[StreamEvent] is actual data.
*/
-public class PartitionedEvent implements Serializable{
+public class PartitionedEvent implements Serializable {
private static final long serialVersionUID = -3840016190614238593L;
private StreamPartition partition;
private long partitionKey;
@@ -38,11 +36,11 @@ public class PartitionedEvent implements Serializable{
/**
* Used for bolt-internal but not inter-bolts,
- * will not pass across bolts
+ * will not pass across bolts.
*/
private transient Tuple anchor;
- public PartitionedEvent(){
+ public PartitionedEvent() {
this.event = null;
this.partition = null;
this.partitionKey = 0L;
@@ -56,14 +54,18 @@ public class PartitionedEvent implements Serializable{
@Override
public boolean equals(Object obj) {
- if(obj == this) return true;
- if(obj == null) return false;
- if(obj instanceof PartitionedEvent){
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (obj instanceof PartitionedEvent) {
PartitionedEvent another = (PartitionedEvent) obj;
return !(this.partitionKey != another.getPartitionKey()
- || !Objects.equals(this.event, another.getEvent())
- || !Objects.equals(this.partition, another.getPartition())
- || !Objects.equals(this.anchor, another.anchor));
+ || !Objects.equals(this.event, another.getEvent())
+ || !Objects.equals(this.partition, another.getPartition())
+ || !Objects.equals(this.anchor, another.anchor));
} else {
return false;
}
@@ -72,10 +74,10 @@ public class PartitionedEvent implements Serializable{
@Override
public int hashCode() {
return new HashCodeBuilder()
- .append(partitionKey)
- .append(event)
- .append(partition)
- .build();
+ .append(partitionKey)
+ .append(event)
+ .append(partition)
+ .build();
}
public StreamEvent getEvent() {
@@ -94,36 +96,36 @@ public class PartitionedEvent implements Serializable{
this.partition = partition;
}
- public void setPartitionKey(long partitionKey){
+ public void setPartitionKey(long partitionKey) {
this.partitionKey = partitionKey;
}
- public long getPartitionKey(){
+ public long getPartitionKey() {
return this.partitionKey;
}
- public String toString(){
- return String.format("PartitionedEvent[partition=%s,event=%s,key=%s", partition, event,partitionKey);
+ public String toString() {
+ return String.format("PartitionedEvent[partition=%s,event=%s,key=%s", partition, event, partitionKey);
}
public long getTimestamp() {
return (event != null) ? event.getTimestamp() : 0L;
}
- public String getStreamId(){
+ public String getStreamId() {
return (event != null) ? event.getStreamId() : null;
}
- public Object[] getData(){
- return event!=null ? event.getData() : null;
+ public Object[] getData() {
+ return event != null ? event.getData() : null;
}
- public boolean isSortRequired(){
- return isPartitionRequired() && this.getPartition().getSortSpec()!=null;
+ public boolean isSortRequired() {
+ return isPartitionRequired() && this.getPartition().getSortSpec() != null;
}
- public boolean isPartitionRequired(){
- return this.getPartition()!=null;
+ public boolean isPartitionRequired() {
+ return this.getPartition() != null;
}
public PartitionedEvent copy() {
@@ -142,7 +144,7 @@ public class PartitionedEvent implements Serializable{
this.anchor = anchor;
}
- public PartitionedEvent withAnchor(Tuple tuple){
+ public PartitionedEvent withAnchor(Tuple tuple) {
this.setAnchor(tuple);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
index 5f59b1e..d91b001 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
@@ -16,11 +16,10 @@
*/
package org.apache.eagle.alert.engine.model;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.utils.DateTimeUtil;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -28,8 +27,7 @@ import java.util.List;
import java.util.Objects;
/**
- * @since Apr 5, 2016
- *
+ * @since Apr 5, 2016.
*/
public class StreamEvent implements Serializable {
private static final long serialVersionUID = 2765116509856609763L;
@@ -39,15 +37,16 @@ public class StreamEvent implements Serializable {
private long timestamp;
private String metaVersion;
- public StreamEvent(){}
+ public StreamEvent() {
+ }
- public StreamEvent(String streamId,long timestamp,Object[] data){
+ public StreamEvent(String streamId, long timestamp, Object[] data) {
this.setStreamId(streamId);
this.setTimestamp(timestamp);
this.setData(data);
}
- public StreamEvent(String streamId,long timestamp,Object[] data,String metaVersion){
+ public StreamEvent(String streamId, long timestamp, Object[] data, String metaVersion) {
this.setStreamId(streamId);
this.setTimestamp(timestamp);
this.setData(data);
@@ -62,9 +61,6 @@ public class StreamEvent implements Serializable {
this.streamId = streamId;
}
- public Object[] getData() {
- return data;
- }
public void setData(Object[] data) {
this.data = data;
@@ -93,10 +89,12 @@ public class StreamEvent implements Serializable {
@Override
public boolean equals(Object obj) {
- if(obj == this) return true;
- if(obj instanceof StreamEvent){
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof StreamEvent) {
StreamEvent another = (StreamEvent) obj;
- return Objects.equals(this.streamId,another.streamId) && this.timestamp == another.timestamp && Arrays.deepEquals(this.data,another.data);
+ return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp && Arrays.deepEquals(this.data, another.data);
}
return false;
}
@@ -104,7 +102,7 @@ public class StreamEvent implements Serializable {
@Override
public String toString() {
List<String> dataStrings = new ArrayList<>();
- if(this.getData() != null) {
+ if (this.getData() != null) {
for (Object obj : this.getData()) {
if (obj != null) {
dataStrings.add(obj.toString());
@@ -113,17 +111,21 @@ public class StreamEvent implements Serializable {
}
}
}
- return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]",this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), StringUtils.join(dataStrings,","), this.getMetaVersion());
+ return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]",
+ this.getStreamId(),
+ DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
+ StringUtils.join(dataStrings, ","),
+ this.getMetaVersion());
}
- public static StreamEventBuilder Builder(){
+ public static StreamEventBuilder builder() {
return new StreamEventBuilder();
}
/**
- * @return cloned new event object
+ * @return cloned new event object.
*/
- public StreamEvent copy(){
+ public StreamEvent copy() {
StreamEvent newEvent = new StreamEvent();
newEvent.setTimestamp(this.getTimestamp());
newEvent.setData(this.getData());
@@ -132,19 +134,18 @@ public class StreamEvent implements Serializable {
return newEvent;
}
- public void copyFrom(StreamEvent event){
+ public void copyFrom(StreamEvent event) {
this.setTimestamp(event.getTimestamp());
this.setData(event.getData());
this.setStreamId(event.getStreamId());
this.setMetaVersion(event.getMetaVersion());
}
- /**
- * @param column
- * @param streamDefinition
- * @return
- */
- public Object[] getData(StreamDefinition streamDefinition,List<String> column) {
+ public Object[] getData() {
+ return data;
+ }
+
+ public Object[] getData(StreamDefinition streamDefinition, List<String> column) {
ArrayList<Object> result = new ArrayList<>(column.size());
for (String colName : column) {
result.add(this.getData()[streamDefinition.getColumnIndex(colName)]);
@@ -152,7 +153,7 @@ public class StreamEvent implements Serializable {
return result.toArray();
}
- public Object[] getData(StreamDefinition streamDefinition,String ... column) {
+ public Object[] getData(StreamDefinition streamDefinition, String... column) {
ArrayList<Object> result = new ArrayList<>(column.length);
for (String colName : column) {
result.add(this.getData()[streamDefinition.getColumnIndex(colName)]);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
index 1036ba2..53101ef 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
@@ -25,62 +25,65 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-public class StreamEventBuilder{
- private final static Logger LOG = LoggerFactory.getLogger(StreamEventBuilder.class);
+public class StreamEventBuilder {
+ private static final Logger LOG = LoggerFactory.getLogger(StreamEventBuilder.class);
private StreamEvent instance;
private StreamDefinition streamDefinition;
- public StreamEventBuilder(){
+
+ public StreamEventBuilder() {
instance = new StreamEvent();
}
- public StreamEventBuilder schema(StreamDefinition streamDefinition){
+ public StreamEventBuilder schema(StreamDefinition streamDefinition) {
this.streamDefinition = streamDefinition;
- if(instance.getStreamId() == null) instance.setStreamId(streamDefinition.getStreamId());
+ if (instance.getStreamId() == null) {
+ instance.setStreamId(streamDefinition.getStreamId());
+ }
return this;
}
- public StreamEventBuilder streamId(String streamId){
+ public StreamEventBuilder streamId(String streamId) {
instance.setStreamId(streamId);
return this;
}
- public StreamEventBuilder attributes(Map<String,Object> data, StreamDefinition streamDefinition){
+ public StreamEventBuilder attributes(Map<String, Object> data, StreamDefinition streamDefinition) {
this.schema(streamDefinition);
List<StreamColumn> columnList = streamDefinition.getColumns();
- if(columnList!=null && columnList.size() > 0){
+ if (columnList != null && columnList.size() > 0) {
List<Object> values = new ArrayList<>(columnList.size());
for (StreamColumn column : columnList) {
- values.add(data.getOrDefault(column.getName(),column.getDefaultValue()));
+ values.add(data.getOrDefault(column.getName(), column.getDefaultValue()));
}
instance.setData(values.toArray());
- } else if(LOG.isDebugEnabled()){
- LOG.warn("All data [{}] are ignored as no columns defined in schema {}",data,streamDefinition);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.warn("All data [{}] are ignored as no columns defined in schema {}", data, streamDefinition);
}
return this;
}
- public StreamEventBuilder attributes(Map<String,Object> data){
- return attributes(data,this.streamDefinition);
+ public StreamEventBuilder attributes(Map<String, Object> data) {
+ return attributes(data, this.streamDefinition);
}
- public StreamEventBuilder attributes(Object ... data){
+ public StreamEventBuilder attributes(Object... data) {
instance.setData(data);
return this;
}
- public StreamEventBuilder timestamep(long timestamp){
+ public StreamEventBuilder timestamep(long timestamp) {
instance.setTimestamp(timestamp);
return this;
}
- public StreamEventBuilder metaVersion(String metaVersion){
+ public StreamEventBuilder metaVersion(String metaVersion) {
instance.setMetaVersion(metaVersion);
return this;
}
- public StreamEvent build(){
- if(instance.getStreamId() == null){
+ public StreamEvent build() {
+ if (instance.getStreamId() == null) {
throw new IllegalArgumentException("streamId is null of event: " + instance);
}
return instance;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
index 06a99f4..461a23c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
@@ -1,14 +1,4 @@
-package org.apache.eagle.alert.metric;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.metric.sink.MetricSink;
-import org.apache.eagle.alert.metric.source.MetricSource;
-
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
-
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -24,41 +14,48 @@ import com.typesafe.config.Config;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.eagle.alert.metric;
+
+import org.apache.eagle.alert.metric.sink.MetricSink;
+import org.apache.eagle.alert.metric.source.MetricSource;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import java.util.Map;
+
public interface IMetricSystem {
/**
- * Initialize
+ * Initialize.
*/
void start();
/**
- * Schedule reporter
+ * Schedule reporter.
*/
void schedule();
/**
- * Close and stop all resources and services
+ * Close and stop all resources and services.
*/
void stop();
/**
- * Manual report metric
+ * Manual report metric.
*/
void report();
/**
- *
- * @param sink metric sink
+ * @param sink metric sink.
*/
- void register(MetricSink sink,Config config);
+ void register(MetricSink sink, Config config);
/**
- *
- * @param source metric source
+ * @param source metric source.
*/
void register(MetricSource source);
- void tags(Map<String,Object> metricTags);
+ void tags(Map<String, Object> metricTags);
MetricRegistry registry();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
index b91c606..555c4ec 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,74 +16,74 @@
*/
package org.apache.eagle.alert.metric;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
import org.apache.eagle.alert.metric.sink.MetricSink;
import org.apache.eagle.alert.metric.sink.MetricSinkRepository;
import org.apache.eagle.alert.metric.source.MetricSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.codahale.metrics.MetricRegistry;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
public class MetricSystem implements IMetricSystem {
private final Config config;
- private Map<MetricSink,Config> sinks = new HashMap<>();
-// private Map<String,MetricSource> sources = new HashMap<>();
+ private Map<MetricSink, Config> sinks = new HashMap<>();
+ // private Map<String,MetricSource> sources = new HashMap<>();
private MetricRegistry registry = new MetricRegistry();
private boolean running;
private boolean initialized;
- private final static Logger LOG = LoggerFactory.getLogger(MetricSystem.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MetricSystem.class);
private final Map<String, Object> metricTags = new HashMap<>();
- public MetricSystem(Config config){
+ public MetricSystem(Config config) {
this.config = config;
}
- public static MetricSystem load(Config config){
+ public static MetricSystem load(Config config) {
MetricSystem instance = new MetricSystem(config);
instance.loadFromConfig();
return instance;
}
@Override
- public void tags(Map<String,Object> metricTags){
+ public void tags(Map<String, Object> metricTags) {
this.metricTags.putAll(metricTags);
}
@Override
public void start() {
- if(initialized)
+ if (initialized) {
throw new IllegalStateException("Attempting to initialize a MetricsSystem that is already intialized");
- sinks.forEach((sink,conf) -> sink.prepare(conf.withValue("tags",ConfigFactory.parseMap(metricTags).root()),registry));
+ }
+ sinks.forEach((sink, conf) -> sink.prepare(conf.withValue("tags", ConfigFactory.parseMap(metricTags).root()), registry));
initialized = true;
}
@Override
public void schedule() {
- if(running){
- throw new IllegalStateException("Attempting to start a MetricsSystem that is already running");
+ if (running) {
+ throw new IllegalStateException("Attempting to start a MetricsSystem that is already running");
}
- sinks.keySet().forEach((sink)->sink.start(5, TimeUnit.SECONDS));
+ sinks.keySet().forEach((sink) -> sink.start(5, TimeUnit.SECONDS));
running = true;
}
- public void loadFromConfig(){
+ public void loadFromConfig() {
loadSinksFromConfig();
}
- private void loadSinksFromConfig(){
+ private void loadSinksFromConfig() {
Config sinkCls = config.hasPath("metric.sink") ? config.getConfig("metric.sink") : null;
- if(sinkCls == null){
+ if (sinkCls == null) {
// do nothing
- }else{
- for(String sinkType:sinkCls.root().unwrapped().keySet()){
- register(MetricSinkRepository.createSink(sinkType),config.getConfig("metric.sink."+sinkType));
+ } else {
+ for (String sinkType : sinkCls.root().unwrapped().keySet()) {
+ register(MetricSinkRepository.createSink(sinkType), config.getConfig("metric.sink." + sinkType));
}
}
}
@@ -99,9 +99,9 @@ public class MetricSystem implements IMetricSystem {
}
@Override
- public void register(MetricSink sink,Config config) {
- LOG.debug("Register {}",sink);
- sinks.put(sink,config);
+ public void register(MetricSink sink, Config config) {
+ LOG.debug("Register {}", sink);
+ sinks.put(sink, config);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
index b5e6c63..f1262c7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
@@ -16,55 +16,54 @@
*/
package org.apache.eagle.alert.metric.entity;
-import java.util.Map;
-import java.util.TreeMap;
-
import org.apache.eagle.alert.utils.DateTimeUtil;
-
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
+import java.util.Map;
+import java.util.TreeMap;
-public class MetricEvent extends TreeMap<String,Object>{
+public class MetricEvent extends TreeMap<String, Object> {
private static final long serialVersionUID = 6862373651636342744L;
- public static Builder of(String name){
+ public static Builder of(String name) {
return new Builder(name);
}
/**
- * TODO: Refactor according to ConsoleReporter
+ * TODO: Refactor according to ConsoleReporter.
*/
- public static class Builder{
+ public static class Builder {
private final String name;
private MetricEvent instance;
- public Builder(String name){
+
+ public Builder(String name) {
this.instance = new MetricEvent();
this.name = name;
}
- public Builder from(Counter value) {
-// this.instance.put("type","counter");
- this.instance.put("count",value.getCount());
- return this;
- }
-
- public MetricEvent build(){
- this.instance.put("name",name);
- if(!this.instance.containsKey("timestamp")){
+ public MetricEvent build() {
+ this.instance.put("name", name);
+ if (!this.instance.containsKey("timestamp")) {
this.instance.put("timestamp", DateTimeUtil.getCurrentTimestamp());
}
return this.instance;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ public Builder from(Counter value) {
+ // this.instance.put("type","counter");
+ this.instance.put("count", value.getCount());
+ return this;
+ }
+
+ @SuppressWarnings( {"rawtypes", "unchecked"})
public Builder from(Gauge gauge) {
Object value = gauge.getValue();
- if( value instanceof Map){
+ if (value instanceof Map) {
Map<? extends String, ?> map = (Map<? extends String, ?>) value;
this.instance.putAll(map);
} else {
@@ -74,7 +73,7 @@ public class MetricEvent extends TreeMap<String,Object>{
}
public Builder from(Histogram value) {
- this.instance.put("count",value.getCount());
+ this.instance.put("count", value.getCount());
Snapshot snapshot = value.getSnapshot();
this.instance.put("min", snapshot.getMin());
this.instance.put("max", snapshot.getMax());
@@ -90,21 +89,21 @@ public class MetricEvent extends TreeMap<String,Object>{
}
public Builder from(Meter value) {
- this.instance.put("value",value.getCount());
- this.instance.put("15MinRate",value.getFifteenMinuteRate());
- this.instance.put("5MinRate",value.getFiveMinuteRate());
- this.instance.put("mean",value.getMeanRate());
- this.instance.put("1MinRate",value.getOneMinuteRate());
+ this.instance.put("value", value.getCount());
+ this.instance.put("15MinRate", value.getFifteenMinuteRate());
+ this.instance.put("5MinRate", value.getFiveMinuteRate());
+ this.instance.put("mean", value.getMeanRate());
+ this.instance.put("1MinRate", value.getOneMinuteRate());
return this;
}
public Builder from(Timer value) {
-// this.instance.put("type","timer");
- this.instance.put("value",value.getCount());
- this.instance.put("15MinRate",value.getFifteenMinuteRate());
- this.instance.put("5MinRate",value.getFiveMinuteRate());
- this.instance.put("mean",value.getMeanRate());
- this.instance.put("1MinRate",value.getOneMinuteRate());
+ // this.instance.put("type","timer");
+ this.instance.put("value", value.getCount());
+ this.instance.put("15MinRate", value.getFifteenMinuteRate());
+ this.instance.put("5MinRate", value.getFiveMinuteRate());
+ this.instance.put("mean", value.getMeanRate());
+ this.instance.put("1MinRate", value.getOneMinuteRate());
return this;
}
}