You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/08 07:14:10 UTC
[09/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle
problems on eagle-alert module and enable failOnViolation
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
index 87faf82..7a93e72 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
@@ -19,6 +19,12 @@
package org.apache.eagle.alert.engine.runner;
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.spout.CorrelationSpout;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.apache.eagle.alert.utils.StreamIdConversion;
+
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
@@ -28,11 +34,7 @@ import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigRenderOptions;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.spout.CorrelationSpout;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,24 +50,24 @@ import java.util.List;
*/
public class UnitTopologyRunner {
private static final Logger LOG = LoggerFactory.getLogger(UnitTopologyRunner.class);
- public final static String spoutName = "alertEngineSpout";
- private final static String streamRouterBoltNamePrefix = "streamRouterBolt";
- private final static String alertBoltNamePrefix = "alertBolt";
- public final static String alertPublishBoltName = "alertPublishBolt";
-
- public final static String TOTAL_WORKER_NUM = "topology.numOfTotalWorkers";
- public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
- public final static String ROUTER_TASK_NUM = "topology.numOfRouterBolts";
- public final static String ALERT_TASK_NUM = "topology.numOfAlertBolts";
- public final static String PUBLISH_TASK_NUM = "topology.numOfPublishTasks";
- public final static String LOCAL_MODE = "topology.localMode";
- public final static String MESSAGE_TIMEOUT_SECS = "topology.messageTimeoutSecs";
- public final static int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600;
+ public static final String spoutName = "alertEngineSpout";
+ private static final String streamRouterBoltNamePrefix = "streamRouterBolt";
+ private static final String alertBoltNamePrefix = "alertBolt";
+ public static final String alertPublishBoltName = "alertPublishBolt";
+
+ public static final String TOTAL_WORKER_NUM = "topology.numOfTotalWorkers";
+ public static final String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+ public static final String ROUTER_TASK_NUM = "topology.numOfRouterBolts";
+ public static final String ALERT_TASK_NUM = "topology.numOfAlertBolts";
+ public static final String PUBLISH_TASK_NUM = "topology.numOfPublishTasks";
+ public static final String LOCAL_MODE = "topology.localMode";
+ public static final String MESSAGE_TIMEOUT_SECS = "topology.messageTimeoutSecs";
+ public static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600;
private final IMetadataChangeNotifyService metadataChangeNotifyService;
private backtype.storm.Config givenStormConfig = null;
- public UnitTopologyRunner(IMetadataChangeNotifyService metadataChangeNotifyService){
+ public UnitTopologyRunner(IMetadataChangeNotifyService metadataChangeNotifyService) {
this.metadataChangeNotifyService = metadataChangeNotifyService;
}
@@ -75,15 +77,15 @@ public class UnitTopologyRunner {
}
public StormTopology buildTopology(String topologyId,
- int numOfSpoutTasks,
- int numOfRouterBolts,
- int numOfAlertBolts,
- int numOfPublishTasks,
- Config config) {
+ int numOfSpoutTasks,
+ int numOfRouterBolts,
+ int numOfAlertBolts,
+ int numOfPublishTasks,
+ Config config) {
StreamRouterBolt[] routerBolts = new StreamRouterBolt[numOfRouterBolts];
AlertBolt[] alertBolts = new AlertBolt[numOfAlertBolts];
- AlertPublisherBolt publisherBolt;
+
TopologyBuilder builder = new TopologyBuilder();
@@ -93,27 +95,27 @@ public class UnitTopologyRunner {
builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
// construct StreamRouterBolt objects
- for(int i=0; i<numOfRouterBolts; i++){
+ for (int i = 0; i < numOfRouterBolts; i++) {
routerBolts[i] = new StreamRouterBolt(streamRouterBoltNamePrefix + i, config, getMetadataChangeNotifyService());
}
// construct AlertBolt objects
- for(int i=0; i<numOfAlertBolts; i++){
- alertBolts[i] = new AlertBolt(alertBoltNamePrefix+i, config, getMetadataChangeNotifyService());
+ for (int i = 0; i < numOfAlertBolts; i++) {
+ alertBolts[i] = new AlertBolt(alertBoltNamePrefix + i, config, getMetadataChangeNotifyService());
}
// construct AlertPublishBolt object
- publisherBolt = new AlertPublisherBolt(alertPublishBoltName, config, getMetadataChangeNotifyService());
+ AlertPublisherBolt publisherBolt = new AlertPublisherBolt(alertPublishBoltName, config, getMetadataChangeNotifyService());
// connect spout and router bolt, also define output streams for downstreaming alert bolt
- for(int i=0; i<numOfRouterBolts; i++){
+ for (int i = 0; i < numOfRouterBolts; i++) {
String boltName = streamRouterBoltNamePrefix + i;
// define output streams, which are based on
String streamId = StreamIdConversion.generateStreamIdBetween(spoutName, boltName);
List<String> outputStreamIds = new ArrayList<>(numOfAlertBolts);
- for(int j=0; j<numOfAlertBolts; j++){
- String sid = StreamIdConversion.generateStreamIdBetween(boltName, alertBoltNamePrefix+j);
+ for (int j = 0; j < numOfAlertBolts; j++) {
+ String sid = StreamIdConversion.generateStreamIdBetween(boltName, alertBoltNamePrefix + j);
outputStreamIds.add(sid);
}
routerBolts[i].declareOutputStreams(outputStreamIds);
@@ -126,79 +128,79 @@ public class UnitTopologyRunner {
}
// connect router bolt and alert bolt, also define output streams for downstreaming alert publish bolt
- for(int i=0; i<numOfAlertBolts; i++){
+ for (int i = 0; i < numOfAlertBolts; i++) {
String boltName = alertBoltNamePrefix + i;
BoltDeclarer boltDeclarer = builder.setBolt(boltName, alertBolts[i]);
- for(int j=0; j<numOfRouterBolts; j++) {
- String streamId = StreamIdConversion.generateStreamIdBetween(streamRouterBoltNamePrefix+j, boltName);
- boltDeclarer.fieldsGrouping(streamRouterBoltNamePrefix+j, streamId, new Fields());
+ for (int j = 0; j < numOfRouterBolts; j++) {
+ String streamId = StreamIdConversion.generateStreamIdBetween(streamRouterBoltNamePrefix + j, boltName);
+ boltDeclarer.fieldsGrouping(streamRouterBoltNamePrefix + j, streamId, new Fields());
}
}
// connect alert bolt and alert publish bolt, this is the last bolt in the pipeline
BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, publisherBolt).setNumTasks(numOfPublishTasks);
- for(int i=0; i<numOfAlertBolts; i++) {
- boltDeclarer.fieldsGrouping(alertBoltNamePrefix+i, new Fields(AlertConstants.FIELD_0));
+ for (int i = 0; i < numOfAlertBolts; i++) {
+ boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new Fields(AlertConstants.FIELD_0));
}
return builder.createTopology();
}
+ public StormTopology buildTopology(String topologyId, Config config) {
+ int numOfSpoutTasks = config.getInt("topology.numOfSpoutTasks");
+ int numOfRouterBolts = config.getInt("topology.numOfRouterBolts");
+ int numOfAlertBolts = config.getInt("topology.numOfAlertBolts");
+ int numOfPublishTasks = config.getInt("topology.numOfPublishTasks");
+ return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
+ }
+
private void run(String topologyId,
- int numOfTotalWorkers,
- int numOfSpoutTasks,
- int numOfRouterBolts,
- int numOfAlertBolts,
- int numOfPublishTasks,
- Config config,
- boolean localMode) {
+ int numOfTotalWorkers,
+ int numOfSpoutTasks,
+ int numOfRouterBolts,
+ int numOfAlertBolts,
+ int numOfPublishTasks,
+ Config config,
+ boolean localMode) {
backtype.storm.Config stormConfig = givenStormConfig == null ? new backtype.storm.Config() : givenStormConfig;
// TODO: Configurable metric consumer instance number
- int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS)?config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
- LOG.info("Set topology.message.timeout.secs as {}",messageTimeoutSecs);
+ int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS) ? config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
+ LOG.info("Set topology.message.timeout.secs as {}", messageTimeoutSecs);
stormConfig.setMessageTimeoutSecs(messageTimeoutSecs);
- if(config.hasPath("metric")) {
- stormConfig.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()),1);
+ if (config.hasPath("metric")) {
+ stormConfig.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()), 1);
}
stormConfig.setNumWorkers(numOfTotalWorkers);
StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
- if(localMode) {
+ if (localMode) {
LOG.info("Submitting as local mode");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyId, stormConfig, topology);
Utils.sleep(Long.MAX_VALUE);
- }else{
+ } else {
LOG.info("Submitting as cluster mode");
try {
StormSubmitter.submitTopologyWithProgressBar(topologyId, stormConfig, topology);
- } catch(Exception ex) {
+ } catch (Exception ex) {
LOG.error("fail submitting topology {}", topology, ex);
throw new IllegalStateException(ex);
}
}
}
- public void run(String topologyId,Config config) {
+ public void run(String topologyId, Config config) {
int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
boolean localMode = config.getBoolean(LOCAL_MODE);
int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
- run(topologyId,numOfTotalWorkers, numOfSpoutTasks,numOfRouterBolts,numOfAlertBolts,numOfPublishTasks,config, localMode);
- }
-
- public StormTopology buildTopology(String topologyId,Config config) {
- int numOfSpoutTasks = config.getInt("topology.numOfSpoutTasks");
- int numOfRouterBolts = config.getInt("topology.numOfRouterBolts");
- int numOfAlertBolts = config.getInt("topology.numOfAlertBolts");
- int numOfPublishTasks = config.getInt("topology.numOfPublishTasks");
- return buildTopology(topologyId,numOfSpoutTasks,numOfRouterBolts,numOfAlertBolts,numOfPublishTasks,config);
+ run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config, localMode);
}
public IMetadataChangeNotifyService getMetadataChangeNotifyService() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
index c1da90f..43df203 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
@@ -19,19 +19,18 @@
package org.apache.eagle.alert.engine.scheme;
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-
/**
- * Expects flat Json scheme
+ * Expects flat Json scheme.
*/
public class JsonScheme implements Scheme {
private static final long serialVersionUID = -8352896475656975577L;
@@ -54,16 +53,18 @@ public class JsonScheme implements Scheme {
@SuppressWarnings("rawtypes")
public List<Object> deserialize(byte[] ser) {
try {
- if(ser != null ) {
+ if (ser != null) {
Map map = mapper.readValue(ser, Map.class);
return Arrays.asList(topic, map);
- }else{
- if(LOG.isDebugEnabled()) LOG.debug("Content is null, ignore");
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Content is null, ignore");
+ }
}
} catch (IOException e) {
try {
LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e);
- }catch(Exception ex){
+ } catch (Exception ex) {
LOG.error(ex.getMessage(), ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
index 226dd84..4e02edb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
@@ -19,23 +19,22 @@
package org.apache.eagle.alert.engine.scheme;
-import java.util.Map;
-import java.util.Properties;
-
import org.apache.eagle.alert.coordination.model.StreamNameSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.Properties;
+
/**
* A strategy to get stream name from message tuple.
- *
- * Since 5/5/16.
+ * @since 5/5/16.
*/
public class JsonStringStreamNameSelector implements StreamNameSelector {
- private final static Logger LOG = LoggerFactory.getLogger(JsonStringStreamNameSelector.class);
- public final static String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
- public final static String FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY = "fieldNamesToInferStreamName";
- public final static String STREAM_NAME_FORMAT = "streamNameFormat";
+ private static final Logger LOG = LoggerFactory.getLogger(JsonStringStreamNameSelector.class);
+ public static final String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
+ public static final String FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY = "fieldNamesToInferStreamName";
+ public static final String STREAM_NAME_FORMAT = "streamNameFormat";
private String userProvidedStreamName;
private String[] fieldNamesToInferStreamName;
@@ -70,5 +69,5 @@ public class JsonStringStreamNameSelector implements StreamNameSelector {
}
return "defaultStringStream";
}
-
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
index 194b0c2..57c8897 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
@@ -19,21 +19,20 @@
package org.apache.eagle.alert.engine.scheme;
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
/**
- * used for parsing plain string
+ * used for parsing plain string.
*/
public class PlainStringScheme implements Scheme {
private static final long serialVersionUID = 5969724968671646310L;
@@ -42,7 +41,7 @@ public class PlainStringScheme implements Scheme {
private String topic;
@SuppressWarnings("rawtypes")
- public PlainStringScheme(String topic, Map conf){
+ public PlainStringScheme(String topic, Map conf) {
this.topic = topic;
}
@@ -57,7 +56,7 @@ public class PlainStringScheme implements Scheme {
return new Fields(STRING_SCHEME_KEY);
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings( {"unchecked", "rawtypes"})
@Override
public List<Object> deserialize(byte[] ser) {
Map m = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
index 61ec943..0b88483 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
@@ -19,29 +19,31 @@
package org.apache.eagle.alert.engine.scheme;
-import java.util.Map;
-import java.util.Properties;
-
import org.apache.eagle.alert.coordination.model.StreamNameSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.Properties;
+
/**
* Since 5/3/16.
*/
public class PlainStringStreamNameSelector implements StreamNameSelector {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(PlainStringStreamNameSelector.class);
- private final static String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
+ private static final String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
private static final String DEFAULT_STRING_STREAM_NAME = "defaultStringStream";
private String streamName;
- public PlainStringStreamNameSelector(Properties prop){
+ public PlainStringStreamNameSelector(Properties prop) {
streamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY);
- if(streamName == null)
+ if (streamName == null) {
streamName = DEFAULT_STRING_STREAM_NAME;
+ }
}
+
@Override
public String getStreamName(Map<String, Object> tuple) {
return streamName;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
index 03c1dfb..1e8f440 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
@@ -16,18 +16,18 @@
*/
package org.apache.eagle.alert.engine.serialization;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
import org.apache.eagle.alert.engine.serialization.impl.StreamEventSerializer;
import org.apache.eagle.alert.engine.serialization.impl.StreamPartitionDigestSerializer;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
/**
- * TODO: Seams the complexity dosen't bring enough performance improve
+ * TODO: Seams the complexity dosen't bring enough performance improve.
*
* @see PartitionedEvent
*/
@@ -36,7 +36,7 @@ public class PartitionedEventDigestSerializer implements Serializer<PartitionedE
private final Serializer<StreamEvent> streamEventSerializer;
private final Serializer<StreamPartition> streamPartitionSerializer;
- public PartitionedEventDigestSerializer(SerializationMetadataProvider serializationMetadataProvider){
+ public PartitionedEventDigestSerializer(SerializationMetadataProvider serializationMetadataProvider) {
this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
this.streamPartitionSerializer = StreamPartitionDigestSerializer.INSTANCE;
}
@@ -44,8 +44,8 @@ public class PartitionedEventDigestSerializer implements Serializer<PartitionedE
@Override
public void serialize(PartitionedEvent entity, DataOutput dataOutput) throws IOException {
dataOutput.writeLong(entity.getPartitionKey());
- streamEventSerializer.serialize(entity.getEvent(),dataOutput);
- streamPartitionSerializer.serialize(entity.getPartition(),dataOutput);
+ streamEventSerializer.serialize(entity.getEvent(), dataOutput);
+ streamPartitionSerializer.serialize(entity.getPartition(), dataOutput);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
index f653361..428ad34 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
@@ -16,24 +16,13 @@
*/
package org.apache.eagle.alert.engine.serialization;
-import java.io.IOException;
-
import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import java.io.IOException;
+
public interface PartitionedEventSerializer {
- /**
- *
- * @param entity
- * @return
- * @throws IOException
- */
+
byte[] serialize(PartitionedEvent entity) throws IOException;
- /**
- *
- * @param bytes
- * @return
- * @throws IOException
- */
PartitionedEvent deserialize(byte[] bytes) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
index 69bb695..42f0559 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
@@ -19,15 +19,13 @@ package org.apache.eagle.alert.engine.serialization;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
-import java.io.IOException;
-
/**
- * Integration interface to provide stream definition for serializer
+ * Integration interface to provide stream definition for serializer.
*/
public interface SerializationMetadataProvider {
/**
* @param streamId
- * @return StreamDefinition or null if not exist
+ * @return StreamDefinition or null if not exist.
*/
StreamDefinition getStreamDefinition(String streamId) throws StreamDefinitionNotFoundException;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
index c2f87d0..599152e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
@@ -21,6 +21,7 @@ import java.io.DataOutput;
import java.io.IOException;
public interface Serializer<V> {
- void serialize(V value,DataOutput dataOutput) throws IOException;
+ void serialize(V value, DataOutput dataOutput) throws IOException;
+
V deserialize(DataInput dataInput) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
index 6be8f1a..a84b5dc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
@@ -16,49 +16,42 @@
*/
package org.apache.eagle.alert.engine.serialization;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.serialization.impl.*;
+
import java.util.HashMap;
import java.util.Map;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.serialization.impl.BooleanSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.DoubleSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.FloatSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.IntegerSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.JavaObjectSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.LongSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
-import org.apache.eagle.alert.engine.serialization.impl.StringSerializer;
-
public class Serializers {
- private final static Map<StreamColumn.Type,Serializer<?>> COLUMN_TYPE_SER_MAPPING = new HashMap<>();
+ private static final Map<StreamColumn.Type, Serializer<?>> COLUMN_TYPE_SER_MAPPING = new HashMap<>();
- public static <T> void register(StreamColumn.Type type,Serializer<T> serializer){
- if(COLUMN_TYPE_SER_MAPPING.containsKey(type)){
- throw new IllegalArgumentException("Duplicated column type: "+type);
+ public static <T> void register(StreamColumn.Type type, Serializer<T> serializer) {
+ if (COLUMN_TYPE_SER_MAPPING.containsKey(type)) {
+ throw new IllegalArgumentException("Duplicated column type: " + type);
}
- COLUMN_TYPE_SER_MAPPING.put(type,serializer);
+ COLUMN_TYPE_SER_MAPPING.put(type, serializer);
}
@SuppressWarnings("unchecked")
- public static <T> Serializer<T> getColumnSerializer(StreamColumn.Type type){
- if(COLUMN_TYPE_SER_MAPPING.containsKey(type)){
+ public static <T> Serializer<T> getColumnSerializer(StreamColumn.Type type) {
+ if (COLUMN_TYPE_SER_MAPPING.containsKey(type)) {
return (Serializer<T>) COLUMN_TYPE_SER_MAPPING.get(type);
- }else{
- throw new IllegalArgumentException("Serializer of type: "+type+" not found");
+ } else {
+ throw new IllegalArgumentException("Serializer of type: " + type + " not found");
}
}
- public static PartitionedEventSerializer newPartitionedEventSerializer(SerializationMetadataProvider metadataProvider){
+ public static PartitionedEventSerializer newPartitionedEventSerializer(SerializationMetadataProvider metadataProvider) {
return new PartitionedEventSerializerImpl(metadataProvider);
}
static {
- register(StreamColumn.Type.STRING,new StringSerializer());
- register(StreamColumn.Type.INT,new IntegerSerializer());
- register(StreamColumn.Type.LONG,new LongSerializer());
- register(StreamColumn.Type.FLOAT,new FloatSerializer());
- register(StreamColumn.Type.DOUBLE,new DoubleSerializer());
- register(StreamColumn.Type.BOOL,new BooleanSerializer());
- register(StreamColumn.Type.OBJECT,new JavaObjectSerializer());
+ register(StreamColumn.Type.STRING, new StringSerializer());
+ register(StreamColumn.Type.INT, new IntegerSerializer());
+ register(StreamColumn.Type.LONG, new LongSerializer());
+ register(StreamColumn.Type.FLOAT, new FloatSerializer());
+ register(StreamColumn.Type.DOUBLE, new DoubleSerializer());
+ register(StreamColumn.Type.BOOL, new BooleanSerializer());
+ register(StreamColumn.Type.OBJECT, new JavaObjectSerializer());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
index db91a70..1e90569 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
@@ -1,11 +1,11 @@
package org.apache.eagle.alert.engine.serialization.impl;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
index f2f5359..ad5f53c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
@@ -1,11 +1,11 @@
package org.apache.eagle.alert.engine.serialization.impl;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
index e6b510a..18089a9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
@@ -1,11 +1,11 @@
package org.apache.eagle.alert.engine.serialization.impl;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
index f784456..d2473a9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
@@ -1,11 +1,11 @@
package org.apache.eagle.alert.engine.serialization.impl;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
index 39baf2b..14d9ea5 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
@@ -16,14 +16,14 @@
*/
package org.apache.eagle.alert.engine.serialization.impl;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+import org.apache.commons.lang3.SerializationUtils;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
public class JavaObjectSerializer implements Serializer<Object> {
@Override
public void serialize(Object value, DataOutput dataOutput) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
index 116b275..8d85c76 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
@@ -1,11 +1,11 @@
package org.apache.eagle.alert.engine.serialization.impl;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
index 5a3d77d..2b0140f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
@@ -16,10 +16,6 @@
*/
package org.apache.eagle.alert.engine.serialization.impl;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
@@ -27,23 +23,27 @@ import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
import org.apache.eagle.alert.engine.serialization.Serializer;
import org.apache.eagle.alert.engine.utils.CompressionUtils;
-
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
/**
* Stream Metadata Cached Serializer
*
- * Performance:
+ * <p> Performance:
*
* 1) VS Kryo Direct: reduce 73.4% space (bytes) and 42.5 % time (ms).
* 2) VS Java Native: reduce 92.5% space (bytes) and 94.2% time (ms)
+ * </p>
*
- * Tips:
- *
+ * <p>Tips:
* 1) Without-compression performs better than with compression for small event
+ * </p>
*
- * TODO: Cache Partition would save little space but almost half of serialization time, how to balance the performance?
+ * <p>TODO: Cache Partition would save little space but almost half of serialization time, how to balance the performance?</p>
*
* @see PartitionedEvent
*/
@@ -53,10 +53,10 @@ public class PartitionedEventSerializerImpl implements Serializer<PartitionedEve
private final boolean compress;
/**
- * @param serializationMetadataProvider metadata provider
- * @param compress false by default
+ * @param serializationMetadataProvider metadata provider.
+ * @param compress false by default.
*/
- public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider,boolean compress) {
+ public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider, boolean compress) {
this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
this.streamPartitionSerializer = StreamPartitionSerializer.INSTANCE;
this.compress = compress;
@@ -76,6 +76,13 @@ public class PartitionedEventSerializerImpl implements Serializer<PartitionedEve
}
@Override
+ public byte[] serialize(PartitionedEvent entity) throws IOException {
+ ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();
+ this.serialize(entity, dataOutput);
+ return compress ? CompressionUtils.compress(dataOutput.toByteArray()) : dataOutput.toByteArray();
+ }
+
+ @Override
public PartitionedEvent deserialize(DataInput dataInput) throws IOException {
PartitionedEvent event = new PartitionedEvent();
event.setPartitionKey(dataInput.readLong());
@@ -87,15 +94,9 @@ public class PartitionedEventSerializerImpl implements Serializer<PartitionedEve
return event;
}
- @Override
- public byte[] serialize(PartitionedEvent entity) throws IOException {
- ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();
- this.serialize(entity,dataOutput);
- return compress ? CompressionUtils.compress(dataOutput.toByteArray()):dataOutput.toByteArray();
- }
@Override
public PartitionedEvent deserialize(byte[] bytes) throws IOException {
- return this.deserialize(ByteStreams.newDataInput(compress ? CompressionUtils.decompress(bytes):bytes));
+ return this.deserialize(ByteStreams.newDataInput(compress ? CompressionUtils.decompress(bytes) : bytes));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
index e13b23f..d7119db 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
@@ -29,25 +29,22 @@ import java.io.IOException;
import java.util.BitSet;
/**
+ * StreamEventSerializer.
+ *
* @see StreamEvent
*/
public class StreamEventSerializer implements Serializer<StreamEvent> {
private final SerializationMetadataProvider serializationMetadataProvider;
- public StreamEventSerializer(SerializationMetadataProvider serializationMetadataProvider){
+ public StreamEventSerializer(SerializationMetadataProvider serializationMetadataProvider) {
this.serializationMetadataProvider = serializationMetadataProvider;
}
- /**
- *
- * @param objects
- * @return
- */
- private BitSet isNullBitSet(Object[] objects){
+ private BitSet isNullBitSet(Object[] objects) {
BitSet bitSet = new BitSet();
int i = 0;
- for(Object obj:objects){
- bitSet.set(i,obj == null);
+ for (Object obj : objects) {
+ bitSet.set(i, obj == null);
i++;
}
return bitSet;
@@ -58,28 +55,30 @@ public class StreamEventSerializer implements Serializer<StreamEvent> {
// Bryant: here "metaVersion/streamId" writes to dataOutputUTF
String metaVersion = event.getMetaVersion();
String streamId = event.getStreamId();
- String metaVersion_streamId = String.format("%s/%s", metaVersion, streamId);
+ String metaVersionStreamId = String.format("%s/%s", metaVersion, streamId);
- dataOutput.writeUTF(metaVersion_streamId);
+ dataOutput.writeUTF(metaVersionStreamId);
dataOutput.writeLong(event.getTimestamp());
- if(event.getData() == null || event.getData().length == 0){
+ if (event.getData() == null || event.getData().length == 0) {
dataOutput.writeInt(0);
- }else{
+ } else {
BitSet isNullIndex = isNullBitSet(event.getData());
byte[] isNullBytes = isNullIndex.toByteArray();
dataOutput.writeInt(isNullBytes.length);
dataOutput.write(isNullBytes);
- int i =0;
+ int i = 0;
StreamDefinition definition = serializationMetadataProvider.getStreamDefinition(event.getStreamId());
- if(definition == null) throw new IOException("StreamDefinition not found: "+event.getStreamId());
- if(event.getData().length != definition.getColumns().size()){
- throw new IOException("Event :"+event+" doesn't match with schema: "+definition);
+ if (definition == null) {
+ throw new IOException("StreamDefinition not found: " + event.getStreamId());
+ }
+ if (event.getData().length != definition.getColumns().size()) {
+ throw new IOException("Event :" + event + " doesn't match with schema: " + definition);
}
- for(StreamColumn column:definition.getColumns()){
- if(!isNullIndex.get(i)) {
- Serializers.getColumnSerializer(column.getType()).serialize(event.getData()[i],dataOutput);
+ for (StreamColumn column : definition.getColumns()) {
+ if (!isNullIndex.get(i)) {
+ Serializers.getColumnSerializer(column.getType()).serialize(event.getData()[i], dataOutput);
}
- i ++;
+ i++;
}
}
}
@@ -87,9 +86,9 @@ public class StreamEventSerializer implements Serializer<StreamEvent> {
@Override
public StreamEvent deserialize(DataInput dataInput) throws IOException {
StreamEvent event = new StreamEvent();
- String metaVersion_streamId = dataInput.readUTF();
- String streamId = metaVersion_streamId.split("/")[1];
- String metaVersion = metaVersion_streamId.split("/")[0];
+ String metaVersionStreamId = dataInput.readUTF();
+ String streamId = metaVersionStreamId.split("/")[1];
+ String metaVersion = metaVersionStreamId.split("/")[0];
event.setStreamId(streamId);
event.setMetaVersion(metaVersion);
@@ -101,11 +100,11 @@ public class StreamEventSerializer implements Serializer<StreamEvent> {
BitSet isNullIndex = BitSet.valueOf(isNullBytes);
Object[] attributes = new Object[definition.getColumns().size()];
int i = 0;
- for(StreamColumn column:definition.getColumns()){
- if(!isNullIndex.get(i)) {
+ for (StreamColumn column : definition.getColumns()) {
+ if (!isNullIndex.get(i)) {
attributes[i] = Serializers.getColumnSerializer(column.getType()).deserialize(dataInput);
}
- i ++;
+ i++;
}
event.setData(attributes);
return event;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
index f35da39..6a47f1e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
@@ -16,39 +16,35 @@
*/
package org.apache.eagle.alert.engine.serialization.impl;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.*;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
/**
- * Don't serialize streamId
+ * Don't serialize streamId.
*
* @see StreamPartition
*/
public class StreamPartitionDigestSerializer implements Serializer<StreamPartition> {
- public final static StreamPartitionDigestSerializer INSTANCE = new StreamPartitionDigestSerializer();
+ public static final StreamPartitionDigestSerializer INSTANCE = new StreamPartitionDigestSerializer();
- private final Map<DigestBytes,StreamPartition> checkSumPartitionMap = new HashMap<>();
- private final Map<StreamPartition,DigestBytes> partitionCheckSumMap = new HashMap<>();
+ private final Map<DigestBytes, StreamPartition> checkSumPartitionMap = new HashMap<>();
+ private final Map<StreamPartition, DigestBytes> partitionCheckSumMap = new HashMap<>();
@Override
public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException {
DigestBytes checkSum = partitionCheckSumMap.get(partition);
- if(checkSum == null){
+ if (checkSum == null) {
try {
checkSum = digestCheckSum(partition);
- partitionCheckSumMap.put(partition,checkSum);
- checkSumPartitionMap.put(checkSum,partition);
+ partitionCheckSumMap.put(partition, checkSum);
+ checkSumPartitionMap.put(checkSum, partition);
} catch (NoSuchAlgorithmException e) {
throw new IOException(e);
}
@@ -63,8 +59,8 @@ public class StreamPartitionDigestSerializer implements Serializer<StreamPartiti
byte[] checksum = new byte[checkSumLen];
dataInput.readFully(checksum);
StreamPartition partition = checkSumPartitionMap.get(new DigestBytes(checksum));
- if(partition == null){
- throw new IOException("Illegal partition checksum: "+checksum);
+ if (partition == null) {
+ throw new IOException("Illegal partition checksum: " + checksum);
}
return partition;
}
@@ -72,7 +68,7 @@ public class StreamPartitionDigestSerializer implements Serializer<StreamPartiti
private class DigestBytes {
private final byte[] data;
- public DigestBytes(byte[] bytes){
+ public DigestBytes(byte[] bytes) {
this.data = bytes;
}
@@ -85,10 +81,12 @@ public class StreamPartitionDigestSerializer implements Serializer<StreamPartiti
public int hashCode() {
return Arrays.hashCode(data);
}
- public int size(){
+
+ public int size() {
return data.length;
}
- public byte[] toByteArray(){
+
+ public byte[] toByteArray() {
return data;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
index 4105277..411368f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
@@ -16,38 +16,38 @@
*/
package org.apache.eagle.alert.engine.serialization.impl;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
/**
- * Don't serialize streamId
+ * Don't serialize streamId.
*
* @see StreamPartition
*/
public class StreamPartitionSerializer implements Serializer<StreamPartition> {
- public final static StreamPartitionSerializer INSTANCE = new StreamPartitionSerializer();
+ public static final StreamPartitionSerializer INSTANCE = new StreamPartitionSerializer();
@Override
public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(partition.getType().toString());
- if(partition.getColumns() == null || partition.getColumns().size() == 0){
+ if (partition.getColumns() == null || partition.getColumns().size() == 0) {
dataOutput.writeInt(0);
} else {
dataOutput.writeInt(partition.getColumns().size());
- for(String column:partition.getColumns()){
+ for (String column : partition.getColumns()) {
dataOutput.writeUTF(column);
}
}
- if(partition.getSortSpec() == null){
+ if (partition.getSortSpec() == null) {
dataOutput.writeByte(0);
- }else {
+ } else {
dataOutput.writeByte(1);
dataOutput.writeUTF(partition.getSortSpec().getWindowPeriod());
dataOutput.writeInt(partition.getSortSpec().getWindowMargin());
@@ -59,14 +59,14 @@ public class StreamPartitionSerializer implements Serializer<StreamPartition> {
StreamPartition partition = new StreamPartition();
partition.setType(StreamPartition.Type.locate(dataInput.readUTF()));
int colSize = dataInput.readInt();
- if(colSize>0){
+ if (colSize > 0) {
List<String> columns = new ArrayList<>(colSize);
- for(int i=0;i<colSize;i++){
+ for (int i = 0; i < colSize; i++) {
columns.add(dataInput.readUTF());
}
partition.setColumns(columns);
}
- if(dataInput.readByte() == 1){
+ if (dataInput.readByte() == 1) {
String period = dataInput.readUTF();
int margin = dataInput.readInt();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
index 940024d..2a1541a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
@@ -1,11 +1,11 @@
package org.apache.eagle.alert.engine.serialization.impl;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
index b705564..599f349 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
@@ -18,8 +18,7 @@
*/
package org.apache.eagle.alert.engine.siddhi.extension;
-import java.util.LinkedList;
-
+import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
@@ -28,11 +27,10 @@ import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggrega
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.Attribute.Type;
-import com.google.common.collect.ImmutableList;
+import java.util.LinkedList;
/**
- * @since Apr 1, 2016
- *
+ * @since Apr 1, 2016.
*/
public class AttributeCollectAggregator extends AttributeAggregator {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
index 43400c7..101d05b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
@@ -16,8 +16,7 @@
*/
package org.apache.eagle.alert.engine.siddhi.extension;
-import java.util.LinkedList;
-
+import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
@@ -26,7 +25,7 @@ import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggrega
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.Attribute.Type;
-import com.google.common.collect.ImmutableList;
+import java.util.LinkedList;
public class AttributeCollectWithDistinctAggregator extends AttributeAggregator {
@@ -73,11 +72,11 @@ public class AttributeCollectWithDistinctAggregator extends AttributeAggregator
@Override
public Object processAdd(Object arg0) {
- // AttributeAggregator.process is already synchronized
- if (value.contains(arg0)) {
- value.remove(arg0);
- }
- value.add(arg0);
+ // AttributeAggregator.process is already synchronized
+ if (value.contains(arg0)) {
+ value.remove(arg0);
+ }
+ value.add(arg0);
if (LOG.isDebugEnabled()) {
LOG.debug("processAdd: current values are : " + value);
}
@@ -86,11 +85,11 @@ public class AttributeCollectWithDistinctAggregator extends AttributeAggregator
@Override
public Object processAdd(Object[] arg0) {
- // AttributeAggregator.process is already synchronized
- if (value.contains(arg0)) {
- value.remove(arg0);
- }
- value.add(arg0);
+ // AttributeAggregator.process is already synchronized
+ if (value.contains(arg0)) {
+ value.remove(arg0);
+ }
+ value.add(arg0);
if (LOG.isDebugEnabled()) {
LOG.debug("processAdd: current values are : " + value);
}
@@ -120,5 +119,5 @@ public class AttributeCollectWithDistinctAggregator extends AttributeAggregator
value.clear();
return value;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
index 1bd24ed..27df63b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
@@ -31,16 +31,16 @@ public class ContainsIgnoreCaseExtension extends FunctionExecutor {
@Override
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
if (attributeExpressionExecutors.length != 2) {
- throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:containsIgnoreCase() function, required 2, " +
- "but found " + attributeExpressionExecutors.length);
+ throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:containsIgnoreCase() function, required 2, "
+ + "but found " + attributeExpressionExecutors.length);
}
if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:containsIgnoreCase() function, " +
- "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
+ throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:containsIgnoreCase() function, "
+ + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
}
if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:containsIgnoreCase() function, " +
- "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
+ throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:containsIgnoreCase() function, "
+ + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
}
}
@@ -52,8 +52,8 @@ public class ContainsIgnoreCaseExtension extends FunctionExecutor {
if (data[1] == null) {
throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. Second argument cannot be null");
}
- String str1 = (String)data[0];
- String str2 = (String)data[1];
+ String str1 = (String) data[0];
+ String str2 = (String) data[1];
return str1.toUpperCase().contains(str2.toUpperCase());
}
@@ -79,7 +79,7 @@ public class ContainsIgnoreCaseExtension extends FunctionExecutor {
@Override
public Object[] currentState() {
- return new Object[]{};
+ return new Object[] {};
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
index e99c4b9..1292e05 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
@@ -19,15 +19,11 @@ package org.apache.eagle.alert.engine.siddhi.extension;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.function.FunctionExecutor;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
public class EqualsIgnoreCaseExtension extends FunctionExecutor {
Attribute.Type returnType = Attribute.Type.BOOL;
@@ -35,16 +31,16 @@ public class EqualsIgnoreCaseExtension extends FunctionExecutor {
@Override
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
if (attributeExpressionExecutors.length != 2) {
- throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() function, required 2, " +
- "but found " + attributeExpressionExecutors.length);
+ throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() function, required 2, "
+ + "but found " + attributeExpressionExecutors.length);
}
if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:equalsIgnoreCase() function, " +
- "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
+ throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:equalsIgnoreCase() function, "
+ + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
}
if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:equalsIgnoreCase() function, " +
- "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
+ throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:equalsIgnoreCase() function, "
+ + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
}
}
@@ -56,8 +52,8 @@ public class EqualsIgnoreCaseExtension extends FunctionExecutor {
if (data[1] == null) {
throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. Second argument cannot be null");
}
- String str1 = (String)data[0];
- String str2 = (String)data[1];
+ String str1 = (String) data[0];
+ String str2 = (String) data[1];
return str1.equalsIgnoreCase(str2);
}
@@ -83,7 +79,7 @@ public class EqualsIgnoreCaseExtension extends FunctionExecutor {
@Override
public Object[] currentState() {
- return new Object[]{};
+ return new Object[] {};
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
index d384d47..fe2280f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
@@ -44,18 +44,18 @@ public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension {
@Override
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
if (attributeExpressionExecutors.length != 2) {
- throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, required 2, " +
- "but found " + attributeExpressionExecutors.length);
+ throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, required 2, "
+ + "but found " + attributeExpressionExecutors.length);
}
if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:regexpIgnoreCase() function, " +
- "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
+ throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:regexpIgnoreCase() function, "
+ + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
}
if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:regexpIgnoreCase() function, " +
- "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
+ throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:regexpIgnoreCase() function, "
+ + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
}
- if(attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor){
+ if (attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor) {
isRegexConstant = true;
regexConstant = (String) ((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue();
patternConstant = Pattern.compile(regexConstant, Pattern.CASE_INSENSITIVE);
@@ -76,7 +76,7 @@ public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension {
}
String source = (String) data[0];
- if(!isRegexConstant){
+ if (!isRegexConstant) {
regex = (String) data[1];
pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
matcher = pattern.matcher(source);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
index 38c5c30..0f75b6a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
@@ -16,18 +16,18 @@
*/
package org.apache.eagle.alert.engine.sorter;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.eagle.alert.engine.PartitionedEventCollector;
import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
- * TODO: Make sure thread-safe
- * TODO: Leverage Off-Heap Memory to persist append-only events collection
+ * TODO: Make sure thread-safe.
+ * TODO: Leverage Off-Heap Memory to persist append-only events collection.
*/
public abstract class BaseStreamWindow implements StreamWindow {
private final long endTime;
@@ -35,22 +35,17 @@ public abstract class BaseStreamWindow implements StreamWindow {
private final long margin;
private final AtomicBoolean expired;
private final long createdTime;
- private final static Logger LOG = LoggerFactory.getLogger(BaseStreamWindow.class);
+ private static final Logger LOG = LoggerFactory.getLogger(BaseStreamWindow.class);
private PartitionedEventCollector collector;
private final AtomicLong lastFlushedStreamTime;
private final AtomicLong lastFlushedSystemTime;
- /**
- * @param startTime
- * @param endTime
- * @param marginTime
- */
- public BaseStreamWindow(long startTime, long endTime, long marginTime){
- if(startTime >= endTime){
- throw new IllegalArgumentException("startTime: "+startTime+" >= endTime: "+endTime+", expected: startTime < endTime");
+ public BaseStreamWindow(long startTime, long endTime, long marginTime) {
+ if (startTime >= endTime) {
+ throw new IllegalArgumentException("startTime: " + startTime + " >= endTime: " + endTime + ", expected: startTime < endTime");
}
- if(marginTime > endTime - startTime){
- throw new IllegalArgumentException("marginTime: "+marginTime+" > endTime: "+endTime+" - startTime "+startTime+", expected: marginTime < endTime - startTime");
+ if (marginTime > endTime - startTime) {
+ throw new IllegalArgumentException("marginTime: " + marginTime + " > endTime: " + endTime + " - startTime " + startTime + ", expected: marginTime < endTime - startTime");
}
this.startTime = startTime;
this.endTime = endTime;
@@ -63,8 +58,9 @@ public abstract class BaseStreamWindow implements StreamWindow {
@Override
public void register(PartitionedEventCollector collector) {
- if(this.collector!=null)
+ if (this.collector != null) {
throw new IllegalArgumentException("Duplicated collector error");
+ }
this.collector = collector;
}
@@ -78,7 +74,7 @@ public abstract class BaseStreamWindow implements StreamWindow {
}
@Override
- public long rejectTime(){
+ public long rejectTime() {
return this.lastFlushedStreamTime.get();
}
@@ -93,7 +89,7 @@ public abstract class BaseStreamWindow implements StreamWindow {
public boolean accept(final long eventTime) {
return !expired() && eventTime >= startTime && eventTime < endTime
- && eventTime >= lastFlushedStreamTime.get(); // dropped
+ && eventTime >= lastFlushedStreamTime.get(); // dropped
}
public boolean expired() {
@@ -106,35 +102,38 @@ public abstract class BaseStreamWindow implements StreamWindow {
}
/**
- *
- *
* Expire when
* 1) If stream time >= endTime + marginTime, then flush and expire
* 2) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime >= endTime, then flush and expire.
* 3) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime < endTime, then flush but not expire.
* 4) else do nothing
*
- * @param clock stream time clock
- * @param globalSystemTime system time clock
+ * @param clock stream time clock
+ * @param globalSystemTime system time clock
*/
@Override
- public synchronized void onTick(StreamTimeClock clock,long globalSystemTime) {
- if(!expired()) {
- if(clock.getTime() >= endTime + margin){
- LOG.info("Expiring {} at stream time:{}, latency:{}, window: {}",clock.getStreamId(),DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()),globalSystemTime - lastFlushedSystemTime.get(),this);
+ public synchronized void onTick(StreamTimeClock clock, long globalSystemTime) {
+ if (!expired()) {
+ if (clock.getTime() >= endTime + margin) {
+ LOG.info("Expiring {} at stream time:{}, latency:{}, window: {}", clock.getStreamId(),
+ DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()), globalSystemTime - lastFlushedSystemTime.get(), this);
lastFlushedStreamTime.set(clock.getTime());
lastFlushedSystemTime.set(globalSystemTime);
flush();
expired.set(true);
- } else if(globalSystemTime - lastFlushedSystemTime.get() >= endTime + margin - startTime && size() > 0){
- LOG.info("Flushing {} at system time: {}, stream time: {}, latency: {}, window: {}",clock.getStreamId(),DateTimeUtil.millisecondsToHumanDateWithMilliseconds(globalSystemTime),DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()),globalSystemTime - lastFlushedSystemTime.get(),this);
+ } else if (globalSystemTime - lastFlushedSystemTime.get() >= endTime + margin - startTime && size() > 0) {
+ LOG.info("Flushing {} at system time: {}, stream time: {}, latency: {}, window: {}", clock.getStreamId(),
+ DateTimeUtil.millisecondsToHumanDateWithMilliseconds(globalSystemTime),
+ DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()), globalSystemTime - lastFlushedSystemTime.get(), this);
lastFlushedStreamTime.set(clock.getTime());
lastFlushedSystemTime.set(globalSystemTime);
flush();
- if(lastFlushedStreamTime.get()>=this.endTime) expired.set(true);
+ if (lastFlushedStreamTime.get() >= this.endTime) {
+ expired.set(true);
+ }
}
} else {
- LOG.warn("Window has already expired, should not tick: {}",this.toString());
+ LOG.warn("Window has already expired, should not tick: {}", this.toString());
}
}
@@ -150,7 +149,7 @@ public abstract class BaseStreamWindow implements StreamWindow {
@Override
public boolean equals(Object obj) {
- if(obj !=null && obj instanceof BaseStreamWindow) {
+ if (obj != null && obj instanceof BaseStreamWindow) {
BaseStreamWindow another = (BaseStreamWindow) obj;
return another.startTime == this.startTime && another.endTime == this.endTime && another.margin == this.margin;
}
@@ -158,26 +157,27 @@ public abstract class BaseStreamWindow implements StreamWindow {
}
@Override
- public void flush(){
- if(this.collector == null) throw new NullPointerException("Collector is not given before window flush");
+ public void flush() {
+ if (this.collector == null) {
+ throw new NullPointerException("Collector is not given before window flush");
+ }
this.flush(collector);
}
/**
- *
- * @param collector
- * @return max timestamp
+ * @param collector PartitionedEventCollector.
+ * @return max timestamp.
*/
protected abstract void flush(PartitionedEventCollector collector);
@Override
public String toString() {
return String.format("StreamWindow[period=[%s,%s), margin=%s ms, size=%s, reject=%s]",
- DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime),
- DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.endTime),
- this.margin,
- size(),
- this.rejectTime() == 0 ? DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime): DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.rejectTime())
- );
+ DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime),
+ DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.endTime),
+ this.margin,
+ size(),
+ this.rejectTime() == 0 ? DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime) : DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.rejectTime())
+ );
}
}
\ No newline at end of file