You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/30 01:38:29 UTC
[5/5] incubator-eagle git commit: [EAGLE-806] Integrate Metric
Process and Persistence with Application Framework
[EAGLE-806] Integrate Metric Process and Persistence with Application Framework
Integrate Stream Source and Metric Persistence with Application Framework
- Provide API to easily ingest, process, aggregate and persist metric
- Integrate stream source
- Integrate metric definition and metric persistence.
- Implement basic plug-able aggregation abstraction for later usage.
- Provide `CEPFunction` as default `TransformFunction` based on `SiddhiCEP`
Author: Hao Chen <ha...@apache.org>
Closes #692 from haoch/streamSourceAndPersist.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/10572c29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/10572c29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/10572c29
Branch: refs/heads/master
Commit: 10572c296f58532897cdea4f80132d1420890679
Parents: ff22537
Author: Hao Chen <ha...@apache.org>
Authored: Wed Nov 30 09:38:11 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Nov 30 09:38:11 2016 +0800
----------------------------------------------------------------------
.../alert/engine/coordinator/StreamColumn.java | 408 ++++++-------
.../engine/coordinator/StreamDefinition.java | 288 ++++-----
eagle-core/eagle-app/eagle-app-base/pom.xml | 11 +
.../app/environment/AbstractEnvironment.java | 30 +-
.../eagle/app/environment/Environment.java | 6 +-
.../environment/builder/AggregateFunction.java | 67 +++
.../environment/builder/ApplicationBuilder.java | 122 ++++
.../app/environment/builder/CEPFunction.java | 92 +++
.../app/environment/builder/Collector.java | 23 +
.../app/environment/builder/MaxFunction.java | 48 ++
.../environment/builder/MetricDefinition.java | 165 ++++++
.../builder/StormOutputCollector.java | 36 ++
.../environment/builder/TransformFunction.java | 30 +
.../builder/TransformFunctionBolt.java | 66 +++
.../app/environment/impl/StormEnvironment.java | 34 +-
.../app/messaging/DefaultStreamSinkConfig.java | 42 ++
.../eagle/app/messaging/FlattenEventMapper.java | 60 ++
.../apache/eagle/app/messaging/JsonSchema.java | 66 +++
.../app/messaging/KafkaStreamProvider.java | 130 +++++
.../eagle/app/messaging/KafkaStreamSink.java | 97 ++++
.../app/messaging/KafkaStreamSinkConfig.java | 115 ++++
.../eagle/app/messaging/KafkaStreamSource.java | 162 ++++++
.../app/messaging/KafkaStreamSourceConfig.java | 153 +++++
.../app/messaging/MetricStreamPersist.java | 152 +++++
.../eagle/app/messaging/StormStreamSink.java | 71 +++
.../eagle/app/messaging/StormStreamSource.java | 25 +
.../eagle/app/messaging/StreamEventMapper.java | 35 ++
.../eagle/app/messaging/StreamProvider.java | 50 ++
.../apache/eagle/app/messaging/StreamSink.java | 24 +
.../eagle/app/messaging/StreamSource.java | 23 +
.../eagle/app/service/ApplicationAction.java | 24 +-
.../impl/ApplicationProviderSPILoader.java | 2 +-
.../eagle/app/sink/DefaultStreamSinkConfig.java | 42 --
.../eagle/app/sink/FlattenEventMapper.java | 60 --
.../apache/eagle/app/sink/KafkaStreamSink.java | 144 -----
.../eagle/app/sink/KafkaStreamSinkConfig.java | 109 ----
.../eagle/app/sink/LoggingStreamSink.java | 55 --
.../apache/eagle/app/sink/StormStreamSink.java | 93 ---
.../eagle/app/sink/StreamEventMapper.java | 35 --
.../org/apache/eagle/app/sink/StreamSink.java | 24 -
.../eagle/app/sink/StreamSinkProvider.java | 42 --
.../eagle/app/spi/ApplicationProvider.java | 2 +-
.../eagle/app/utils/StreamConvertHelper.java | 51 ++
.../apache/eagle/app/TestStormApplication.java | 5 +-
.../app/environment/StaticEnvironmentTest.java | 2 +-
.../eagle/app/storm/MockStormApplication.java | 2 +-
.../eagle/app/stream/CEPFunctionTest.java | 50 ++
.../src/test/resources/application.conf | 5 +-
.../org/apache/eagle/common/utils/Tuple2.java | 42 ++
.../apache/eagle/metadata/model/StreamDesc.java | 21 +-
.../metadata/model/StreamSourceConfig.java | 27 +
.../src/test/resources/application-test.xml | 17 +
.../jdbc/provider/JDBCDataSourceProvider.java | 2 +-
.../ApplicationEntityServiceJDBCImpl.java | 4 +-
.../service/client/EagleServiceConnector.java | 3 +-
.../client/impl/EagleServiceClientImpl.java | 15 +-
.../app/example/ExampleStormApplication.java | 2 +-
.../src/test/resources/application.conf | 2 +-
.../org/apache/eagle/gc/GCLogApplication.java | 2 +-
eagle-hadoop-metric/pom.xml | 16 +-
.../eagle/metric/HadoopMetricMonitorApp.java | 38 ++
.../metric/HadoopMetricMonitorAppProdiver.java | 12 +-
...le.metric.HadoopMetricMonitorAppProdiver.xml | 4 +-
.../HadoopMetricMonitorAppProdiverTest.java | 84 ---
.../metric/HadoopMetricMonitorAppDebug.java | 23 +
.../HadoopMetricMonitorAppProviderTest.java | 84 +++
.../eagle/metric/SendSampleDataToKafka.java | 55 ++
.../src/test/resources/application.conf | 49 ++
.../resources/hadoop_jmx_metric_sample.json | 8 +
...adoopQueueRunningApplicationHealthCheck.java | 204 +++----
.../jpm/aggregation/storm/AggregationSpout.java | 2 +-
.../jpm/mr/history/MRHistoryJobApplication.java | 9 +-
.../jpm/mr/running/MRRunningJobApplication.java | 4 +-
.../src/test/resources/mrconf_30784.xml | 2 +-
.../SparkHistoryJobApplicationHealthCheck.java | 198 +++----
.../jpm/spark/running/SparkRunningJobApp.java | 4 +-
.../hbase/HBaseAuditLogApplication.java | 2 +-
.../AbstractHdfsAuditLogApplication.java | 4 +-
.../hive/HiveQueryMonitoringApplication.java | 2 +-
.../oozie/parse/OozieAuditLogApplication.java | 2 +-
eagle-server-assembly/src/main/conf/eagle.conf | 4 +-
.../src/main/resources/application.conf | 4 +-
.../apache/eagle/topology/TopologyCheckApp.java | 2 +-
.../TopologyCheckApplicationHealthCheck.java | 218 +++----
.../hbase/HbaseTopologyEntityParser.java | 330 +++++------
.../hdfs/HdfsTopologyEntityParser.java | 579 +++++++++----------
.../extractor/mr/MRTopologyEntityParser.java | 438 +++++++-------
.../topology/storm/TopologyDataPersistBolt.java | 400 ++++++-------
.../entity/HdfsServiceTopologyAPIEntity.java | 246 ++++----
89 files changed, 4118 insertions(+), 2424 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index 4628043..9705efc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -1,205 +1,205 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import javax.xml.bind.annotation.adapters.XmlAdapter;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-public class StreamColumn implements Serializable {
-
- private static final long serialVersionUID = -5457861313624389106L;
- private String name;
- private Type type;
- private Object defaultValue;
- private boolean required;
- private String description;
- private String nodataExpression;
-
- public String toString() {
- return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]",
- name, type, defaultValue, required, nodataExpression);
- }
-
- public String getNodataExpression() {
- return nodataExpression;
- }
-
- public void setNodataExpression(String nodataExpression) {
- this.nodataExpression = nodataExpression;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- @XmlJavaTypeAdapter(StreamColumnTypeAdapter.class)
- public Type getType() {
- return type;
- }
-
- public void setType(Type type) {
- this.type = type;
- }
-
- @XmlJavaTypeAdapter(value = DefaultValueAdapter.class)
- public Object getDefaultValue() {
- return defaultValue;
- }
-
- private void ensureDefaultValueType() {
- if (this.getDefaultValue() != null && (this.getDefaultValue() instanceof String) && this.getType() != Type.STRING) {
- switch (this.getType()) {
- case INT:
- this.setDefaultValue(Integer.valueOf((String) this.getDefaultValue()));
- break;
- case LONG:
- this.setDefaultValue(Long.valueOf((String) this.getDefaultValue()));
- break;
- case FLOAT:
- this.setDefaultValue(Float.valueOf((String) this.getDefaultValue()));
- break;
- case DOUBLE:
- this.setDefaultValue(Double.valueOf((String) this.getDefaultValue()));
- break;
- case BOOL:
- this.setDefaultValue(Boolean.valueOf((String) this.getDefaultValue()));
- break;
- case OBJECT:
- try {
- this.setDefaultValue(new ObjectMapper().readValue((String) this.getDefaultValue(), HashMap.class));
- } catch (IOException e) {
- throw new IllegalArgumentException(e);
- }
- break;
- default:
- throw new IllegalArgumentException("Illegal type: " + this.getType());
- }
- }
- }
-
- public void setDefaultValue(Object defaultValue) {
- this.defaultValue = defaultValue;
- ensureDefaultValueType();
- }
-
- public boolean isRequired() {
- return required;
- }
-
- public void setRequired(boolean required) {
- this.required = required;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-
- public enum Type implements Serializable {
- STRING("string"), INT("int"), LONG("long"), FLOAT("float"), DOUBLE("double"), BOOL("bool"), OBJECT("object");
-
- private final String name;
-
- Type(String name) {
- this.name = name;
- }
-
- @Override
- public String toString() {
- return name;
- }
-
- @com.fasterxml.jackson.annotation.JsonCreator
- public static Type getEnumFromValue(String value) {
- for (Type testEnum : values()) {
- if (testEnum.name.equalsIgnoreCase(value)) {
- return testEnum;
- }
- }
- throw new IllegalArgumentException();
- }
- }
-
- public static class StreamColumnTypeAdapter extends XmlAdapter<String, Type> {
-
- @Override
- public Type unmarshal(String v) throws Exception {
- return Type.getEnumFromValue(v);
- }
-
- @Override
- public String marshal(Type v) throws Exception {
- return v.name;
- }
- }
-
- public static class DefaultValueAdapter extends XmlAdapter<String, Object> {
- @Override
- public Object unmarshal(String v) throws Exception {
- return v;
- }
-
- @Override
- public String marshal(Object v) throws Exception {
- return v.toString();
- }
- }
-
- public static class Builder {
- private StreamColumn column;
-
- public Builder() {
- column = new StreamColumn();
- }
-
- public Builder name(String name) {
- column.setName(name);
- return this;
- }
-
- public Builder type(Type type) {
- column.setType(type);
- return this;
- }
-
- public Builder defaultValue(Object defaultValue) {
- column.setDefaultValue(defaultValue);
- return this;
- }
-
- public Builder required(boolean required) {
- column.setRequired(required);
- return this;
- }
-
- public StreamColumn build() {
- return column;
- }
- }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+public class StreamColumn implements Serializable {
+
+ private static final long serialVersionUID = -5457861313624389106L;
+ private String name;
+ private Type type;
+ private Object defaultValue;
+ private boolean required;
+ private String description;
+ private String nodataExpression;
+
+ public String toString() {
+ return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]",
+ name, type, defaultValue, required, nodataExpression);
+ }
+
+ public String getNodataExpression() {
+ return nodataExpression;
+ }
+
+ public void setNodataExpression(String nodataExpression) {
+ this.nodataExpression = nodataExpression;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @XmlJavaTypeAdapter(StreamColumnTypeAdapter.class)
+ public Type getType() {
+ return type;
+ }
+
+ public void setType(Type type) {
+ this.type = type;
+ }
+
+ @XmlJavaTypeAdapter(value = DefaultValueAdapter.class)
+ public Object getDefaultValue() {
+ return defaultValue;
+ }
+
+ private void ensureDefaultValueType() {
+ if (this.getDefaultValue() != null && (this.getDefaultValue() instanceof String) && this.getType() != Type.STRING) {
+ switch (this.getType()) {
+ case INT:
+ this.setDefaultValue(Integer.valueOf((String) this.getDefaultValue()));
+ break;
+ case LONG:
+ this.setDefaultValue(Long.valueOf((String) this.getDefaultValue()));
+ break;
+ case FLOAT:
+ this.setDefaultValue(Float.valueOf((String) this.getDefaultValue()));
+ break;
+ case DOUBLE:
+ this.setDefaultValue(Double.valueOf((String) this.getDefaultValue()));
+ break;
+ case BOOL:
+ this.setDefaultValue(Boolean.valueOf((String) this.getDefaultValue()));
+ break;
+ case OBJECT:
+ try {
+ this.setDefaultValue(new ObjectMapper().readValue((String) this.getDefaultValue(), HashMap.class));
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal type: " + this.getType());
+ }
+ }
+ }
+
+ public void setDefaultValue(Object defaultValue) {
+ this.defaultValue = defaultValue;
+ ensureDefaultValueType();
+ }
+
+ public boolean isRequired() {
+ return required;
+ }
+
+ public void setRequired(boolean required) {
+ this.required = required;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public enum Type implements Serializable {
+ STRING("string"), INT("int"), LONG("long"), FLOAT("float"), DOUBLE("double"), BOOL("bool"), OBJECT("object");
+
+ private final String name;
+
+ Type(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ @com.fasterxml.jackson.annotation.JsonCreator
+ public static Type getEnumFromValue(String value) {
+ for (Type testEnum : values()) {
+ if (testEnum.name.equalsIgnoreCase(value)) {
+ return testEnum;
+ }
+ }
+ throw new IllegalArgumentException();
+ }
+ }
+
+ public static class StreamColumnTypeAdapter extends XmlAdapter<String, Type> {
+
+ @Override
+ public Type unmarshal(String v) throws Exception {
+ return Type.getEnumFromValue(v);
+ }
+
+ @Override
+ public String marshal(Type v) throws Exception {
+ return v.name;
+ }
+ }
+
+ public static class DefaultValueAdapter extends XmlAdapter<String, Object> {
+ @Override
+ public Object unmarshal(String v) throws Exception {
+ return v;
+ }
+
+ @Override
+ public String marshal(Object v) throws Exception {
+ return v.toString();
+ }
+ }
+
+ public static class Builder {
+ private StreamColumn column;
+
+ public Builder() {
+ column = new StreamColumn();
+ }
+
+ public Builder name(String name) {
+ column.setName(name);
+ return this;
+ }
+
+ public Builder type(Type type) {
+ column.setType(type);
+ return this;
+ }
+
+ public Builder defaultValue(Object defaultValue) {
+ column.setDefaultValue(defaultValue);
+ return this;
+ }
+
+ public Builder required(boolean required) {
+ column.setRequired(required);
+ return this;
+ }
+
+ public StreamColumn build() {
+ return column;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
index e0789f9..1be36f3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
@@ -1,145 +1,145 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlElementWrapper;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This is actually a data source schema.
- *
- * @since Apr 5, 2016
- */
-public class StreamDefinition implements Serializable {
- private static final long serialVersionUID = 2352202882328931825L;
-
- // Stream unique ID
- private String streamId;
-
- // Stream description
- private String description;
-
- // Is validateable or not
- private boolean validate;
-
- // Is timeseries-based stream or not
- private boolean timeseries;
-
- // TODO: Decouple dataSource and siteId from stream definition
-
- // Stream data source ID
- private String dataSource;
-
- // Tenant (Site) ID
- private String siteId;
-
- private List<StreamColumn> columns = new ArrayList<>();
-
- public String toString() {
- return String.format("StreamDefinition[streamId=%s, dataSource=%s, description=%s, validate=%s, timeseries=%s, columns=%s",
- streamId,
- dataSource,
- description,
- validate,
- timeseries,
- columns);
- }
-
- public String getStreamId() {
- return streamId;
- }
-
- public void setStreamId(String streamId) {
- this.streamId = streamId;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-
- public boolean isValidate() {
- return validate;
- }
-
- public void setValidate(boolean validate) {
- this.validate = validate;
- }
-
- public boolean isTimeseries() {
- return timeseries;
- }
-
- public void setTimeseries(boolean timeseries) {
- this.timeseries = timeseries;
- }
-
- @XmlElementWrapper(name = "columns")
- @XmlElement(name = "column")
- public List<StreamColumn> getColumns() {
- return columns;
- }
-
- public void setColumns(List<StreamColumn> columns) {
- this.columns = columns;
- }
-
- public String getDataSource() {
- return dataSource;
- }
-
- public void setDataSource(String dataSource) {
- this.dataSource = dataSource;
- }
-
- public int getColumnIndex(String column) {
- int i = 0;
- for (StreamColumn col : this.getColumns()) {
- if (col.getName().equals(column)) {
- return i;
- }
- i++;
- }
- return -1;
- }
-
- public String getSiteId() {
- return siteId;
- }
-
- public void setSiteId(String siteId) {
- this.siteId = siteId;
- }
-
- public StreamDefinition copy() {
- StreamDefinition copied = new StreamDefinition();
- copied.setColumns(this.getColumns());
- copied.setDataSource(this.getDataSource());
- copied.setDescription(this.getDescription());
- copied.setSiteId(this.getSiteId());
- copied.setStreamId(this.getStreamId());
- copied.setTimeseries(this.isTimeseries());
- copied.setValidate(this.isValidate());
- return copied;
- }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementWrapper;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is actually a data source schema.
+ *
+ * @since Apr 5, 2016
+ */
+public class StreamDefinition implements Serializable {
+ private static final long serialVersionUID = 2352202882328931825L;
+
+ // Stream unique ID
+ private String streamId;
+
+ // Stream description
+ private String description;
+
+ // Is validateable or not
+ private boolean validate;
+
+ // Is timeseries-based stream or not
+ private boolean timeseries;
+
+ // TODO: Decouple dataSource and siteId from stream definition
+
+ // Stream data source ID
+ private String dataSource;
+
+ // Tenant (Site) ID
+ private String siteId;
+
+ private List<StreamColumn> columns = new ArrayList<>();
+
+ public String toString() {
+ return String.format("StreamDefinition[streamId=%s, dataSource=%s, description=%s, validate=%s, timeseries=%s, columns=%s",
+ streamId,
+ dataSource,
+ description,
+ validate,
+ timeseries,
+ columns);
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public void setStreamId(String streamId) {
+ this.streamId = streamId;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public boolean isValidate() {
+ return validate;
+ }
+
+ public void setValidate(boolean validate) {
+ this.validate = validate;
+ }
+
+ public boolean isTimeseries() {
+ return timeseries;
+ }
+
+ public void setTimeseries(boolean timeseries) {
+ this.timeseries = timeseries;
+ }
+
+ @XmlElementWrapper(name = "columns")
+ @XmlElement(name = "column")
+ public List<StreamColumn> getColumns() {
+ return columns;
+ }
+
+ public void setColumns(List<StreamColumn> columns) {
+ this.columns = columns;
+ }
+
+ public String getDataSource() {
+ return dataSource;
+ }
+
+ public void setDataSource(String dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ public int getColumnIndex(String column) {
+ int i = 0;
+ for (StreamColumn col : this.getColumns()) {
+ if (col.getName().equals(column)) {
+ return i;
+ }
+ i++;
+ }
+ return -1;
+ }
+
+ public String getSiteId() {
+ return siteId;
+ }
+
+ public void setSiteId(String siteId) {
+ this.siteId = siteId;
+ }
+
+ public StreamDefinition copy() {
+ StreamDefinition copied = new StreamDefinition();
+ copied.setColumns(this.getColumns());
+ copied.setDataSource(this.getDataSource());
+ copied.setDescription(this.getDescription());
+ copied.setSiteId(this.getSiteId());
+ copied.setStreamId(this.getStreamId());
+ copied.setTimeseries(this.isTimeseries());
+ copied.setValidate(this.isValidate());
+ return copied;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/pom.xml b/eagle-core/eagle-app/eagle-app-base/pom.xml
index a5e295e..21fdfae 100644
--- a/eagle-core/eagle-app/eagle-app-base/pom.xml
+++ b/eagle-core/eagle-app/eagle-app-base/pom.xml
@@ -114,6 +114,17 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-client-base</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>xercesImpl</artifactId>
+ <groupId>xerces</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>${powermock.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java
index 02c130a..5032aa6 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/AbstractEnvironment.java
@@ -16,8 +16,7 @@
*/
package org.apache.eagle.app.environment;
-import org.apache.eagle.app.sink.KafkaStreamSink;
-import org.apache.eagle.app.sink.StreamSinkProvider;
+import org.apache.eagle.app.messaging.*;
import com.typesafe.config.Config;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.slf4j.Logger;
@@ -26,25 +25,25 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractEnvironment implements Environment {
private final Config config;
- private final StreamSinkProvider sinkProvider;
- private static final String APPLICATIONS_SINK_TYPE_PROPS_KEY = "application.sink.provider";
- private static final String DEFAULT_APPLICATIONS_SINK_TYPE = KafkaStreamSink.Provider.class.getName();
+ private final StreamProvider streamProvider;
+ private static final String APPLICATIONS_MESSAGING_TYPE_PROPS_KEY = "application.stream.provider";
+ private static final String DEFAULT_APPLICATIONS_MESSAGING_TYPE = KafkaStreamProvider.class.getName();
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEnvironment.class);
public AbstractEnvironment(Config config) {
this.config = config;
- this.sinkProvider = loadStreamSinkProvider();
+ this.streamProvider = loadStreamProvider();
}
- private StreamSinkProvider loadStreamSinkProvider() {
- String sinkProviderClassName = config.hasPath(APPLICATIONS_SINK_TYPE_PROPS_KEY)
- ? config.getString(APPLICATIONS_SINK_TYPE_PROPS_KEY) : DEFAULT_APPLICATIONS_SINK_TYPE;
+ private StreamProvider loadStreamProvider() {
+ String sinkProviderClassName = config.hasPath(APPLICATIONS_MESSAGING_TYPE_PROPS_KEY)
+ ? config.getString(APPLICATIONS_MESSAGING_TYPE_PROPS_KEY) : DEFAULT_APPLICATIONS_MESSAGING_TYPE;
try {
Class<?> sinkProviderClass = Class.forName(sinkProviderClassName);
- if (!StreamSinkProvider.class.isAssignableFrom(sinkProviderClass)) {
- throw new IllegalStateException(sinkProviderClassName + "is not assignable from " + StreamSinkProvider.class.getCanonicalName());
+ if (!StreamProvider.class.isAssignableFrom(sinkProviderClass)) {
+ throw new IllegalStateException(sinkProviderClassName + "is not assignable from " + StreamProvider.class.getCanonicalName());
}
- StreamSinkProvider instance = (StreamSinkProvider) sinkProviderClass.newInstance();
+ StreamProvider instance = (StreamProvider) sinkProviderClass.newInstance();
LOGGER.info("Loaded {}", instance);
return instance;
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
@@ -60,12 +59,13 @@ public abstract class AbstractEnvironment implements Environment {
.append(this.config()).build();
}
- public StreamSinkProvider streamSink() {
- return sinkProvider;
+ public StreamProvider stream() {
+ return streamProvider;
}
+
@Override
public Config config() {
return config;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java
index 29e90d9..db87693 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/Environment.java
@@ -16,7 +16,7 @@
*/
package org.apache.eagle.app.environment;
-import org.apache.eagle.app.sink.StreamSinkProvider;
+import org.apache.eagle.app.messaging.StreamProvider;
import com.typesafe.config.Config;
import java.io.Serializable;
@@ -31,7 +31,7 @@ public interface Environment extends Serializable {
/**
* TODO Only useful for Storm/Spark Exeuctable Application instead of static web application.
*
- * @return StreamSinkProvider.
+ * @return StreamProvider.
*/
- StreamSinkProvider streamSink();
+ StreamProvider stream();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/AggregateFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/AggregateFunction.java
new file mode 100644
index 0000000..836300c
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/AggregateFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AggregateFunction implements TransformFunction {
+ private String aggFieldName;
+ private String resultFieldName;
+ private List<String> groupByFieldNames;
+ private long windowLengthMs;
+
+ public List<String> getGroupByFieldNames() {
+ return groupByFieldNames;
+ }
+
+ public void setGroupByFieldNames(List<String> groupByFieldNames) {
+ this.groupByFieldNames = groupByFieldNames;
+ }
+
+ public String getResultFieldName() {
+ return resultFieldName;
+ }
+
+ public void setResultFieldName(String resultFieldName) {
+ this.resultFieldName = resultFieldName;
+ }
+
+ public String getAggFieldName() {
+ return aggFieldName;
+ }
+
+ public void setAggFieldName(String aggFieldName) {
+ this.aggFieldName = aggFieldName;
+ }
+
+ public AggregateFunction asField(String resultFieldName) {
+ this.setResultFieldName(resultFieldName);
+ return this;
+ }
+
+ public AggregateFunction groupBy(String... groupByFieldNames) {
+ this.setGroupByFieldNames(Arrays.asList(groupByFieldNames));
+ return this;
+ }
+
+ public AggregateFunction windowBy(long windowLength, TimeUnit timeUnit) {
+ this.windowLengthMs = timeUnit.toMillis(windowLength);
+ return this;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
new file mode 100644
index 0000000..83f00db
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.messaging.MetricStreamPersist;
+import org.apache.eagle.app.messaging.StormStreamSource;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Storm Application Builder DSL.
+ */
+public class ApplicationBuilder {
+ private final StormEnvironment environment;
+ private final Config appConfig;
+ private final TopologyBuilder topologyBuilder;
+ private final AtomicInteger identifier;
+
+ public ApplicationBuilder(Config appConfig, StormEnvironment environment) {
+ this.appConfig = appConfig;
+ this.environment = environment;
+ this.identifier = new AtomicInteger(0);
+ this.topologyBuilder = new TopologyBuilder();
+ }
+
+ public class BuilderContext {
+ public StormTopology toTopology() {
+ return topologyBuilder.createTopology();
+ }
+ }
+
+ public abstract class InitializedStream extends BuilderContext {
+ private String id;
+
+ InitializedStream(String id) {
+ Preconditions.checkNotNull(id);
+ this.id = id;
+ }
+
+ String getId() {
+ return this.id;
+ }
+
+ /**
+ * Persist source data stream as metric.
+ */
+ public BuilderContext saveAsMetric(MetricDefinition metricDefinition) {
+ topologyBuilder.setBolt(generateId("MetricPersist"), new MetricStreamPersist(metricDefinition, appConfig)).shuffleGrouping(getId());
+ return this;
+ }
+
+ public TransformedStream transformBy(TransformFunction function) {
+ String componentId = generateId(function.getName());
+ topologyBuilder.setBolt(componentId, new TransformFunctionBolt(function)).shuffleGrouping(getId());
+ return new TransformedStream(componentId);
+ }
+ }
+
+ public class SourcedStream extends InitializedStream {
+ private final Config appConfig;
+ private final StormStreamSource streamSource;
+
+ private SourcedStream(SourcedStream withSourcedStream) {
+ this(withSourcedStream.getId(), withSourcedStream.appConfig, withSourcedStream.streamSource);
+ }
+
+ private SourcedStream(String componentId, Config appConfig, StormStreamSource streamSource) {
+ super(componentId);
+ this.appConfig = appConfig;
+ this.streamSource = streamSource;
+ topologyBuilder.setSpout(componentId, streamSource);
+ }
+ }
+
+ public class TransformedStream extends InitializedStream {
+ public TransformedStream(String id) {
+ super(id);
+ throw new IllegalStateException("TODO: Not implemented yet");
+ }
+ }
+
+ public TopologyBuilder getTopologyBuilder() {
+ return this.topologyBuilder;
+ }
+
+ public StormTopology createTopology() {
+ return topologyBuilder.createTopology();
+ }
+
+
+ public SourcedStream fromStream(String streamId) {
+ return new SourcedStream(generateId("SourcedStream-" + streamId), this.appConfig, environment.getStreamSource(streamId, this.appConfig));
+ }
+
+ public SourcedStream fromStream(SourcedStream sourcedStream) {
+ return new SourcedStream(sourcedStream);
+ }
+
+ private String generateId(String prefix) {
+ return String.format("%s_%s", prefix, this.identifier.getAndIncrement());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
new file mode 100644
index 0000000..dd3b214
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import java.util.Map;
+
+/**
+ * TODO: Not implemented yet.
+ */
+public class CEPFunction implements TransformFunction {
+
+ private final CEPDefinition cepDefinition;
+ private Collector collector;
+
+ public CEPFunction(CEPDefinition cepDefinition) {
+ this.cepDefinition = cepDefinition;
+ }
+
+ public CEPFunction(String siddhiQuery, String inputStreamId, String outputStreamId) {
+ this.cepDefinition = new CEPDefinition(siddhiQuery,inputStreamId, outputStreamId);
+ }
+
+ @Override
+ public String getName() {
+ return "CEPFunction";
+ }
+
+ @Override
+ public void open(Collector collector) {
+ throw new IllegalStateException("TODO: Not implemented yet");
+ }
+
+ @Override
+ public void transform(Map event) {
+ throw new IllegalStateException("TODO: Not implemented yet");
+ }
+
+ @Override
+ public void close() {
+ throw new IllegalStateException("TODO: Not implemented yet");
+ }
+
+ public static class CEPDefinition {
+ private String inputStreamId;
+ private String outputStreamId;
+ private String siddhiQuery;
+
+ public CEPDefinition(String siddhiQuery, String inputStreamId, String outputStreamId) {
+ this.siddhiQuery = siddhiQuery;
+ this.inputStreamId = inputStreamId;
+ this.outputStreamId = outputStreamId;
+ }
+
+ public String getSiddhiQuery() {
+ return siddhiQuery;
+ }
+
+ public void setSiddhiQuery(String siddhiQuery) {
+ this.siddhiQuery = siddhiQuery;
+ }
+
+ public String getOutputStreamId() {
+ return outputStreamId;
+ }
+
+ public void setOutputStreamId(String outputStreamId) {
+ this.outputStreamId = outputStreamId;
+ }
+
+ public String getInputStreamId() {
+ return inputStreamId;
+ }
+
+ public void setInputStreamId(String inputStreamId) {
+ this.inputStreamId = inputStreamId;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/Collector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/Collector.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/Collector.java
new file mode 100644
index 0000000..f86d101
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/Collector.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import java.util.Map;
+
+public interface Collector {
+ void collect(Object key, Map event);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java
new file mode 100644
index 0000000..04c5bf9
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import java.util.Map;
+
+public class MaxFunction extends AggregateFunction {
+ @Override
+ public String getName() {
+ return "MAX";
+ }
+
+ @Override
+ public void open(Collector collector) {
+ throw new IllegalStateException("TODO: Not implemented yet.");
+ }
+
+ @Override
+ public void transform(Map event) {
+ throw new IllegalStateException("TODO: Not implemented yet.");
+ }
+
+ @Override
+ public void close() {
+
+ throw new IllegalStateException("TODO: Not implemented yet.");
+ }
+
+ public static MaxFunction maxOf(String aggFieldName) {
+ MaxFunction function = new MaxFunction();
+ function.setAggFieldName(aggFieldName);
+ return function;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
new file mode 100644
index 0000000..62a81b0
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class MetricDefinition implements Serializable {
+
+ /**
+ * Support simple and complex name format, by default using "metric" field.
+ */
+ private NameSelector nameSelector = new FieldNameSelector("metric");
+
+ /**
+ * Support event/system time, by default using system time.
+ */
+ private TimestampSelector timestampSelector = new SystemTimestampSelector();
+
+ /**
+ * Metric dimension field name.
+ */
+ private List<String> dimensionFields;
+
+ /**
+ * Metric value field name.
+ */
+ private String valueField = "value";
+
+ public NameSelector getNameSelector() {
+ return nameSelector;
+ }
+
+ public void setNameSelector(NameSelector nameSelector) {
+ this.nameSelector = nameSelector;
+ }
+
+ public String getValueField() {
+ return valueField;
+ }
+
+ public void setValueField(String valueField) {
+ this.valueField = valueField;
+ }
+
+ public List<String> getDimensionFields() {
+ return dimensionFields;
+ }
+
+ public void setDimensionFields(List<String> dimensionFields) {
+ this.dimensionFields = dimensionFields;
+ }
+
+ public TimestampSelector getTimestampSelector() {
+ return timestampSelector;
+ }
+
+ public void setTimestampSelector(TimestampSelector timestampSelector) {
+ this.timestampSelector = timestampSelector;
+ }
+
+
+ @FunctionalInterface
+ public interface NameSelector extends Serializable {
+ String getMetricName(Map event);
+ }
+
+ @FunctionalInterface
+ public interface TimestampSelector extends Serializable {
+ Long getTimestamp(Map event);
+ }
+
+ public static MetricDefinition namedBy(NameSelector nameSelector) {
+ MetricDefinition metricDefinition = new MetricDefinition();
+ metricDefinition.setNameSelector(nameSelector);
+ return metricDefinition;
+ }
+
+ public static MetricDefinition namedByField(String nameField) {
+ MetricDefinition metricDefinition = new MetricDefinition();
+ metricDefinition.setNameSelector(new FieldNameSelector(nameField));
+ return metricDefinition;
+ }
+
+ public MetricDefinition eventTimeByField(String timestampField) {
+ this.setTimestampSelector(new EventTimestampSelector(timestampField));
+ return this;
+ }
+
+ public MetricDefinition dimensionFields(String... dimensionFields) {
+ this.setDimensionFields(Arrays.asList(dimensionFields));
+ return this;
+ }
+
+ public MetricDefinition valueField(String valueField) {
+ this.setValueField(valueField);
+ return this;
+ }
+
+ public class EventTimestampSelector implements TimestampSelector {
+ private final String timestampField;
+
+ EventTimestampSelector(String timestampField) {
+ this.timestampField = timestampField;
+ }
+
+ @Override
+ public Long getTimestamp(Map event) {
+ if (event.containsKey(timestampField)) {
+ Object timestampValue = event.get(timestampField);
+ if (timestampValue instanceof Integer) {
+ return Long.valueOf((Integer) timestampValue);
+ }
+ if (timestampValue instanceof String) {
+ return Long.valueOf((String) timestampValue);
+ } else {
+ return (Long) timestampValue;
+ }
+ } else {
+ throw new IllegalArgumentException("Timestamp field '" + timestampField + "' not exists");
+ }
+ }
+ }
+
+ public static class SystemTimestampSelector implements TimestampSelector {
+ @Override
+ public Long getTimestamp(Map event) {
+ return System.currentTimeMillis();
+ }
+ }
+
+ public static class FieldNameSelector implements NameSelector {
+ private final String fieldName;
+
+ FieldNameSelector(String fieldName) {
+ this.fieldName = fieldName;
+ }
+
+ @Override
+ public String getMetricName(Map event) {
+ if (event.containsKey(fieldName)) {
+ return (String) event.get(fieldName);
+ } else {
+ throw new IllegalArgumentException("Metric name field '" + fieldName + "' not exists: " + event);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java
new file mode 100644
index 0000000..9135cc8
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import backtype.storm.task.OutputCollector;
+
+import java.util.Arrays;
+
+import java.util.Map;
+
+public class StormOutputCollector implements Collector {
+ private final OutputCollector delegate;
+
+ StormOutputCollector(OutputCollector delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void collect(Object key, Map event) {
+ delegate.emit(Arrays.asList(key, event));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java
new file mode 100644
index 0000000..11974ff
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public interface TransformFunction extends Serializable {
+ String getName();
+
+ void open(Collector collector);
+
+ void transform(Map event);
+
+ void close();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java
new file mode 100644
index 0000000..dbc7239
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.app.utils.StreamConvertHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class TransformFunctionBolt extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(TransformFunctionBolt.class);
+ private final TransformFunction function;
+ private OutputCollector collector;
+
+ public TransformFunctionBolt(TransformFunction function) {
+ this.function = function;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.function.open(new StormOutputCollector(collector));
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ try {
+ this.function.transform(StreamConvertHelper.tupleToEvent(input).f1());
+ this.collector.ack(input);
+ } catch (Throwable throwable) {
+ LOG.error("Transform error: {}", input, throwable);
+ this.collector.reportError(throwable);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("f1","f2"));
+ }
+
+ @Override
+ public void cleanup() {
+ this.function.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
index 18675a6..ae52cd0 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
@@ -17,8 +17,13 @@
package org.apache.eagle.app.environment.impl;
import org.apache.eagle.app.environment.AbstractEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.environment.builder.ApplicationBuilder;
+import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.environment.builder.TransformFunction;
+import org.apache.eagle.app.environment.builder.TransformFunctionBolt;
+import org.apache.eagle.app.messaging.*;
import com.typesafe.config.Config;
+import org.apache.eagle.metadata.model.StreamSourceConfig;
/**
* Storm Execution Environment Context.
@@ -28,7 +33,30 @@ public class StormEnvironment extends AbstractEnvironment {
super(envConfig);
}
+ // ----------------------------------
+ // Classic Storm Topology Builder API
+ // ----------------------------------
public StormStreamSink getStreamSink(String streamId, Config config) {
- return ((StormStreamSink) streamSink().getSink(streamId,config));
+ return ((StormStreamSink) stream().getSink(streamId,config));
}
-}
+
+ public StormStreamSource getStreamSource(String streamId, Config config) {
+ return (StormStreamSource) stream().getSource(streamId,config);
+ }
+
+ public MetricStreamPersist getMetricPersist(MetricDefinition metricDefinition, Config config) {
+ return new MetricStreamPersist(metricDefinition, config);
+ }
+
+ public TransformFunctionBolt getTransformer(TransformFunction function) {
+ return new TransformFunctionBolt(function);
+ }
+
+ // ----------------------------------
+ // Fluent Storm App Builder API
+ // ----------------------------------
+
+ public ApplicationBuilder newApp(Config appConfig) {
+ return new ApplicationBuilder(appConfig, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java
new file mode 100644
index 0000000..d1cecc9
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+public class DefaultStreamSinkConfig implements StreamSinkConfig {
+ private final Class<?> streamPersistClass;
+ private static final String NONE_STORAGE_TYPE = "NONE";
+
+ public DefaultStreamSinkConfig(Class<?> streamPersistClass) {
+ this.streamPersistClass = streamPersistClass;
+ }
+
+ @Override
+ public String getType() {
+ return NONE_STORAGE_TYPE;
+ }
+
+ public Class<?> getSinkType() {
+ return streamPersistClass;
+ }
+
+ @Override
+ public Class<? extends StreamSinkConfig> getConfigType() {
+ return DefaultStreamSinkConfig.class;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java
new file mode 100644
index 0000000..c8fe1b5
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import backtype.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+public class FlattenEventMapper implements StreamEventMapper {
+ private final String streamId;
+ private static final String TIMESTAMP_FIELD = "timestamp";
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlattenEventMapper.class);
+
+ public FlattenEventMapper(String streamId) {
+ this.streamId = streamId;
+ }
+
+ @Override
+ public List<StreamEvent> map(Tuple tuple) throws Exception {
+ long timestamp;
+ if (tuple.getFields().contains(TIMESTAMP_FIELD)) {
+ try {
+ timestamp = tuple.getLongByField("timestamp");
+ } catch (Exception ex) {
+ // if timestamp is not null
+ LOGGER.error(ex.getMessage(), ex);
+ timestamp = 0;
+ }
+ } else {
+ timestamp = System.currentTimeMillis();
+ }
+ Object[] values = new Object[tuple.getFields().size()];
+ for (int i = 0; i < tuple.getFields().size(); i++) {
+ values[i] = tuple.getValue(i);
+ }
+ StreamEvent event = new StreamEvent();
+ event.setTimestamp(timestamp);
+ event.setStreamId(streamId);
+ event.setData(values);
+ return Collections.singletonList(event);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java
new file mode 100644
index 0000000..987ed0b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * General Json Schema.
+ * Different from org.apache.eagle.alert.engine.scheme.JsonScheme which is just to multi-topic cases.
+ *
+ * @see org.apache.eagle.alert.engine.scheme.JsonScheme
+ */
+public class JsonSchema implements Scheme {
+ private static final long serialVersionUID = -8352896475656975577L;
+ private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(JsonSchema.class);
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields("f1","f2");
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public List<Object> deserialize(byte[] ser) {
+ try {
+ if (ser != null) {
+ Map map = mapper.readValue(ser, Map.class);
+ return Arrays.asList(map.hashCode(), map);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Content is null, ignore");
+ }
+ }
+ } catch (IOException e) {
+ try {
+ LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e);
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage(), ex);
+ }
+ }
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
new file mode 100644
index 0000000..13080a1
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.spout.Scheme;
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, KafkaStreamSinkConfig,KafkaStreamSource,KafkaStreamSourceConfig> {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamProvider.class);
+ private static final String DEFAULT_SHARED_SINK_TOPIC_CONF_KEY = "dataSinkConfig.topic";
+ private static final String DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY = "dataSourceConfig.topic";
+
+ private String getSinkTopicName(String streamId, Config config) {
+ String streamSpecificTopicConfigKey = String.format("dataSinkConfig.%s.topic",streamId);
+ if (config.hasPath(streamSpecificTopicConfigKey)) {
+ return config.getString(streamSpecificTopicConfigKey);
+ } else if (config.hasPath(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY)) {
+ LOG.warn("Using default shared sink topic {}: {}", DEFAULT_SHARED_SINK_TOPIC_CONF_KEY, config.getString(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY));
+ return config.getString(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY);
+ } else {
+ LOG.error("Neither stream specific topic: {} nor default shared topic: {} found in config", streamSpecificTopicConfigKey, DEFAULT_SHARED_SINK_TOPIC_CONF_KEY);
+ throw new IllegalArgumentException("Neither stream specific topic: "
+ + streamSpecificTopicConfigKey + " nor default shared topic: " + DEFAULT_SHARED_SINK_TOPIC_CONF_KEY + " found in config");
+ }
+ }
+
+ private String getSourceTopicName(String streamId, Config config) {
+ String streamSpecificTopicConfigKey = String.format("dataSourceConfig.%s.topic",streamId);;
+ if (config.hasPath(streamSpecificTopicConfigKey)) {
+ return config.getString(streamSpecificTopicConfigKey);
+ } else if (config.hasPath(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY)) {
+ LOG.warn("Using default shared source topic {}: {}", DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY, config.getString(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY));
+ return config.getString(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY);
+ } else {
+ LOG.debug("Neither stream specific topic: {} nor default shared topic: {} found in config, try sink config instead", streamSpecificTopicConfigKey, DEFAULT_SHARED_SINK_TOPIC_CONF_KEY);
+ return getSinkTopicName(streamId,config);
+ }
+ }
+
+ @Override
+ public KafkaStreamSinkConfig getSinkConfig(String streamId, Config config) {
+ KafkaStreamSinkConfig sinkConfig = new KafkaStreamSinkConfig();
+ sinkConfig.setTopicId(getSinkTopicName(streamId,config));
+ sinkConfig.setBrokerList(config.getString("dataSinkConfig.brokerList"));
+ sinkConfig.setSerializerClass(config.hasPath("dataSinkConfig.serializerClass")
+ ? config.getString("dataSinkConfig.serializerClass") : "kafka.serializer.StringEncoder");
+ sinkConfig.setKeySerializerClass(config.hasPath("dataSinkConfig.keySerializerClass")
+ ? config.getString("dataSinkConfig.keySerializerClass") : "kafka.serializer.StringEncoder");
+
+ // new added properties for async producer
+ sinkConfig.setNumBatchMessages(config.hasPath("dataSinkConfig.numBatchMessages")
+ ? config.getString("dataSinkConfig.numBatchMessages") : "1024");
+ sinkConfig.setProducerType(config.hasPath("dataSinkConfig.producerType")
+ ? config.getString("dataSinkConfig.producerType") : "async");
+ sinkConfig.setMaxQueueBufferMs(config.hasPath("dataSinkConfig.maxQueueBufferMs")
+ ? config.getString("dataSinkConfig.maxQueueBufferMs") : "3000");
+ sinkConfig.setRequestRequiredAcks(config.hasPath("dataSinkConfig.requestRequiredAcks")
+ ? config.getString("dataSinkConfig.requestRequiredAcks") : "1");
+
+ return sinkConfig;
+ }
+
+ @Override
+ public KafkaStreamSink getSink() {
+ return new KafkaStreamSink();
+ }
+
+ @Override
+ public KafkaStreamSourceConfig getSourceConfig(String streamId, Config config) {
+ KafkaStreamSourceConfig sourceConfig = new KafkaStreamSourceConfig();
+
+ sourceConfig.setTopicId(getSourceTopicName(streamId,config));
+ sourceConfig.setBrokerZkQuorum(config.getString("dataSourceConfig.zkConnection"));
+
+ if (config.hasPath("dataSourceConfig.fetchSize")) {
+ sourceConfig.setFetchSize(config.getInt("dataSourceConfig.fetchSize"));
+ }
+ if (config.hasPath("dataSourceConfig.transactionZKRoot")) {
+ sourceConfig.setTransactionZKRoot(config.getString("dataSourceConfig.transactionZKRoot"));
+ }
+ if (config.hasPath("dataSourceConfig.consumerGroupId")) {
+ sourceConfig.setConsumerGroupId(config.getString("dataSourceConfig.consumerGroupId"));
+ }
+ if (config.hasPath("dataSourceConfig.brokerZkPath")) {
+ sourceConfig.setBrokerZkPath(config.getString("dataSourceConfig.brokerZkPath"));
+ }
+ if (config.hasPath("dataSourceConfig.txZkServers")) {
+ sourceConfig.setTransactionZkServers(config.getString("dataSourceConfig.txZkServers"));
+ }
+ if (config.hasPath("dataSourceConfig.transactionStateUpdateMS")) {
+ sourceConfig.setTransactionStateUpdateMS(config.getLong("dataSourceConfig.transactionStateUpdateMS"));
+ }
+ if (config.hasPath("dataSourceConfig.startOffsetTime")) {
+ sourceConfig.setStartOffsetTime(config.getInt("dataSourceConfig.startOffsetTime"));
+ }
+ if (config.hasPath("dataSourceConfig.forceFromStart")) {
+ sourceConfig.setForceFromStart(config.getBoolean("dataSourceConfig.forceFromStart"));
+ }
+ if (config.hasPath("dataSourceConfig.schemeCls")) {
+ try {
+ sourceConfig.setSchemaClass((Class<? extends Scheme>) Class.forName(config.getString("dataSourceConfig.schemeCls")));
+ } catch (ClassNotFoundException e) {
+ LOG.error("Class not found error, dataSourceConfig.schemeCls = {}",config.getString("dataSourceConfig.schemeCls"),e);
+ }
+ }
+ return sourceConfig;
+ }
+
+ @Override
+ public KafkaStreamSource getSource() {
+ return new KafkaStreamSource();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java
new file mode 100644
index 0000000..696d79f
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamSink.class);
+ private String topicId;
+ private Producer producer;
+ private KafkaStreamSinkConfig config;
+
+ @Override
+ public void init(String streamId, KafkaStreamSinkConfig config) {
+ super.init(streamId, config);
+ this.topicId = config.getTopicId();
+ this.config = config;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ super.prepare(stormConf, context, collector);
+ Properties properties = new Properties();
+ properties.put("metadata.broker.list", config.getBrokerList());
+ properties.put("serializer.class", config.getSerializerClass());
+ properties.put("key.serializer.class", config.getKeySerializerClass());
+ // new added properties for async producer
+ properties.put("producer.type", config.getProducerType());
+ properties.put("batch.num.messages", config.getNumBatchMessages());
+ properties.put("request.required.acks", config.getRequestRequiredAcks());
+ properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs());
+ ProducerConfig producerConfig = new ProducerConfig(properties);
+ producer = new Producer(producerConfig);
+ }
+
+ @Override
+ protected void execute(Object key, Map event, OutputCollector collector) throws Exception {
+ try {
+ String output = new ObjectMapper().writeValueAsString(event);
+ // partition key may cause data skew
+ //producer.send(new KeyedMessage(this.topicId, key, output));
+ producer.send(new KeyedMessage(this.topicId, output));
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage(), ex);
+ throw ex;
+ }
+ }
+
+ @Override
+ public void afterInstall() {
+ ensureTopicCreated();
+ }
+
+ private void ensureTopicCreated() {
+ LOG.info("TODO: ensure kafka topic {} created", this.topicId);
+ }
+
+ private void ensureTopicDeleted() {
+ LOG.info("TODO: ensure kafka topic {} deleted", this.topicId);
+ }
+
+ @Override
+ public void cleanup() {
+ if (this.producer != null) {
+ this.producer.close();
+ }
+ }
+
+ @Override
+ public void afterUninstall() {
+ ensureTopicDeleted();
+ }
+}
\ No newline at end of file