You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/30 01:38:28 UTC
[4/5] incubator-eagle git commit: [EAGLE-806] Integrate Metric
Process and Persistence with Application Framework
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
new file mode 100644
index 0000000..4a72709
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+/**
+ * FIXME Rename to KafkaStreamMessagingConfig.
+ */
+public class KafkaStreamSinkConfig implements StreamSinkConfig {
+ // Write Config
+ private String topicId;
+ private String brokerList;
+ private String serializerClass;
+ private String keySerializerClass;
+ private String numBatchMessages;
+ private String maxQueueBufferMs;
+ private String producerType;
+ private String requestRequiredAcks;
+
+ public String getTopicId() {
+ return topicId;
+ }
+
+ public void setTopicId(String topicId) {
+ this.topicId = topicId;
+ }
+
+ public String getBrokerList() {
+ return brokerList;
+ }
+
+ public void setBrokerList(String brokerList) {
+ this.brokerList = brokerList;
+ }
+
+ public String getSerializerClass() {
+ return serializerClass;
+ }
+
+ public void setSerializerClass(String serializerClass) {
+ this.serializerClass = serializerClass;
+ }
+
+ public String getKeySerializerClass() {
+ return keySerializerClass;
+ }
+
+ public void setKeySerializerClass(String keySerializerClass) {
+ this.keySerializerClass = keySerializerClass;
+ }
+
+ public String getNumBatchMessages() {
+ return numBatchMessages;
+ }
+
+ public void setNumBatchMessages(String numBatchMessages) {
+ this.numBatchMessages = numBatchMessages;
+ }
+
+ public String getMaxQueueBufferMs() {
+ return maxQueueBufferMs;
+ }
+
+ public void setMaxQueueBufferMs(String maxQueueBufferMs) {
+ this.maxQueueBufferMs = maxQueueBufferMs;
+ }
+
+ public String getProducerType() {
+ return producerType;
+ }
+
+ public void setProducerType(String producerType) {
+ this.producerType = producerType;
+ }
+
+ public String getRequestRequiredAcks() {
+ return requestRequiredAcks;
+ }
+
+ public void setRequestRequiredAcks(String requestRequiredAcks) {
+ this.requestRequiredAcks = requestRequiredAcks;
+ }
+
+ @Override
+ public String getType() {
+ return "KAFKA";
+ }
+
+ @Override
+ public Class<?> getSinkType() {
+ return KafkaStreamSink.class;
+ }
+
+ @Override
+ public Class<? extends StreamSinkConfig> getConfigType() {
+ return KafkaStreamSinkConfig.class;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java
new file mode 100644
index 0000000..9edf644
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import com.google.common.base.Preconditions;
+import org.apache.eagle.alert.engine.spout.SchemeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.*;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class KafkaStreamSource extends StormStreamSource<KafkaStreamSourceConfig> {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamSource.class);
+ private KafkaSpout spout;
+
+ @Override
+ public void init(String streamId, KafkaStreamSourceConfig config) {
+ this.spout = createKafkaSpout(config);
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.spout.open(conf, context, collector);
+ }
+
+ @Override
+ public void nextTuple() {
+ this.spout.nextTuple();
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ this.spout.declareOutputFields(declarer);
+ }
+
+ @Override
+ public void close() {
+ this.spout.close();
+ }
+
+ @Override
+ public void activate() {
+ this.spout.activate();
+ }
+
+ @Override
+ public void deactivate() {
+ this.spout.deactivate();
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ this.spout.ack(msgId);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ this.spout.fail(msgId);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return this.spout.getComponentConfiguration();
+ }
+
+ // ----------------
+ // Helper Methods
+ // ----------------
+
+ private static KafkaSpout createKafkaSpout(KafkaStreamSourceConfig config) {
+
+ // the following is for fetching data from one topic
+ // Kafka topic
+ String topic = config.getTopicId();
+ // Kafka broker zk connection
+ String zkConnString = config.getBrokerZkQuorum();
+ // Kafka fetch size
+ int fetchSize = config.getFetchSize();
+ LOG.info(String.format("Use topic : %s, zkQuorum : %s , fetchSize : %d", topic, zkConnString, fetchSize));
+
+ /*
+ the following is for recording offset for processing the data
+ the zk path to store current offset is comprised of the following
+ offset zkPath = zkRoot + "/" + topic + "/" + consumerGroupId + "/" + partition_Id
+
+ consumerGroupId is for differentiating different consumers which consume the same topic
+ */
+ // transaction zkRoot
+ String zkRoot = config.getTransactionZKRoot();
+ // Kafka consumer group id
+ String groupId = config.getConsumerGroupId();
+ String brokerZkPath = config.getBrokerZkPath();
+
+ BrokerHosts hosts;
+ if (brokerZkPath == null) {
+ hosts = new ZkHosts(zkConnString);
+ } else {
+ hosts = new ZkHosts(zkConnString, brokerZkPath);
+ }
+
+ SpoutConfig spoutConfig = new SpoutConfig(hosts,
+ topic,
+ zkRoot + "/" + topic,
+ groupId);
+
+ // transaction zkServers to store kafka consumer offset. Default to use storm zookeeper
+ if (config.getTransactionZkServers() != null) {
+ String[] txZkServers = config.getTransactionZkServers().split(",");
+ spoutConfig.zkServers = Arrays.asList(txZkServers).stream().map(server -> server.split(":")[0]).collect(Collectors.toList());
+ spoutConfig.zkPort = Integer.parseInt(txZkServers[0].split(":")[1]);
+ LOG.info("txZkServers:" + spoutConfig.zkServers + ", zkPort:" + spoutConfig.zkPort);
+ }
+
+ // transaction update interval
+ spoutConfig.stateUpdateIntervalMs = config.getTransactionStateUpdateMS();
+ // Kafka fetch size
+ spoutConfig.fetchSizeBytes = fetchSize;
+ spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
+
+ // "startOffsetTime" is for test usage, prod should not use this
+ if (config.getStartOffsetTime() >= 0) {
+ spoutConfig.startOffsetTime = config.getStartOffsetTime();
+ }
+ // "forceFromStart" is for test usage, prod should not use this
+ if (config.isForceFromStart()) {
+ spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+ }
+
+ Preconditions.checkNotNull(config.getSchemaClass(), "schemaClass is null");
+ try {
+ Scheme s = config.getSchemaClass().newInstance();
+ spoutConfig.scheme = new SchemeAsMultiScheme(s);
+ } catch (Exception ex) {
+ LOG.error("Error instantiating scheme object");
+ throw new IllegalStateException(ex);
+ }
+ return new KafkaSpout(spoutConfig);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
new file mode 100644
index 0000000..5a9c162
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.metadata.model.StreamSourceConfig;
+
+public class KafkaStreamSourceConfig implements StreamSourceConfig {
+
+ private static final String DEFAULT_CONFIG_PREFIX = "dataSourceConfig";
+ private static final String DEFAULT_CONSUMER_GROUP_ID = "eagleKafkaSource";
+ private static final String DEFAULT_TRANSACTION_ZK_ROOT = "/consumers";
+ private static final Class<? extends backtype.storm.spout.Scheme> DEFAULT_KAFKA_SCHEMA = JsonSchema.class;
+
+ // Read Config
+ private String topicId;
+ private String brokerZkQuorum;
+ private String brokerZkBasePath;
+ private String transactionZkServers;
+
+ private int fetchSize = 1048576;
+ private String transactionZKRoot = DEFAULT_TRANSACTION_ZK_ROOT;
+ private String consumerGroupId = DEFAULT_CONSUMER_GROUP_ID;
+ private String brokerZkPath = null;
+ private long transactionStateUpdateMS = 2000;
+ private int startOffsetTime = -1;
+ private boolean forceFromStart = false;
+ private Class<? extends backtype.storm.spout.Scheme> schemaClass = DEFAULT_KAFKA_SCHEMA;
+
+ public String getBrokerZkQuorum() {
+ return brokerZkQuorum;
+ }
+
+ public void setBrokerZkQuorum(String brokerZkQuorum) {
+ this.brokerZkQuorum = brokerZkQuorum;
+ }
+
+ public String getBrokerZkBasePath() {
+ return brokerZkBasePath;
+ }
+
+ public void setBrokerZkBasePath(String brokerZkBasePath) {
+ this.brokerZkBasePath = brokerZkBasePath;
+ }
+
+ public int getFetchSize() {
+ return fetchSize;
+ }
+
+ public void setFetchSize(int fetchSize) {
+ this.fetchSize = fetchSize;
+ }
+
+ public String getTransactionZKRoot() {
+ return transactionZKRoot;
+ }
+
+ public void setTransactionZKRoot(String transactionZKRoot) {
+ this.transactionZKRoot = transactionZKRoot;
+ }
+
+ public String getConsumerGroupId() {
+ return consumerGroupId;
+ }
+
+ public void setConsumerGroupId(String consumerGroupId) {
+ this.consumerGroupId = consumerGroupId;
+ }
+
+ public String getBrokerZkPath() {
+ return brokerZkPath;
+ }
+
+ public void setBrokerZkPath(String brokerZkPath) {
+ this.brokerZkPath = brokerZkPath;
+ }
+
+ public String getTransactionZkServers() {
+ return transactionZkServers;
+ }
+
+ public void setTransactionZkServers(String transactionZkServers) {
+ this.transactionZkServers = transactionZkServers;
+ }
+
+ public long getTransactionStateUpdateMS() {
+ return transactionStateUpdateMS;
+ }
+
+ public void setTransactionStateUpdateMS(long transactionStateUpdateMS) {
+ this.transactionStateUpdateMS = transactionStateUpdateMS;
+ }
+
+ public int getStartOffsetTime() {
+ return startOffsetTime;
+ }
+
+ public void setStartOffsetTime(int startOffsetTime) {
+ this.startOffsetTime = startOffsetTime;
+ }
+
+ public boolean isForceFromStart() {
+ return forceFromStart;
+ }
+
+ public void setForceFromStart(boolean forceFromStart) {
+ this.forceFromStart = forceFromStart;
+ }
+
+ public Class<? extends backtype.storm.spout.Scheme> getSchemaClass() {
+ return schemaClass;
+ }
+
+ public void setSchemaClass(Class<? extends backtype.storm.spout.Scheme> schemaClass) {
+ this.schemaClass = schemaClass;
+ }
+
+ @Override
+ public String getType() {
+ return "KAFKA";
+ }
+
+ @Override
+ public Class<?> getSourceType() {
+ return KafkaStreamSource.class;
+ }
+
+ @Override
+ public Class<? extends StreamSourceConfig> getConfigType() {
+ return StreamSourceConfig.class;
+ }
+
+ public String getTopicId() {
+ return topicId;
+ }
+
+ public void setTopicId(String topicId) {
+ this.topicId = topicId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
new file mode 100644
index 0000000..32b168b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.utils.StreamConvertHelper;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.BatchSender;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricStreamPersist extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(MetricStreamPersist.class);
+
+ private final Config config;
+ private final MetricMapper mapper;
+ private final int batchSize;
+ private IEagleServiceClient client;
+ private OutputCollector collector;
+ private BatchSender batchSender;
+
+ public MetricStreamPersist(MetricDefinition metricDefinition, Config config) {
+ this.config = config;
+ this.mapper = new StructuredMetricMapper(metricDefinition);
+ this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1;
+ }
+
+ public MetricStreamPersist(MetricMapper mapper, Config config) {
+ this.config = config;
+ this.mapper = mapper;
+ this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.client = new EagleServiceClientImpl(config);
+ if (this.batchSize > 0) {
+ this.batchSender = client.batch(this.batchSize);
+ }
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ try {
+ GenericMetricEntity metricEntity = this.mapper.map(StreamConvertHelper.tupleToEvent(input).f1());
+ if (batchSize <= 1) {
+ GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(metricEntity));
+ if (!response.isSuccess()) {
+ LOG.error("Service side error: {}", response.getException());
+ collector.reportError(new IllegalStateException(response.getException()));
+ } else {
+ collector.ack(input);
+ }
+ } else {
+ this.batchSender.send(metricEntity);
+ collector.ack(input);
+ }
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage(), ex);
+ collector.reportError(ex);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("f1"));
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ this.client.close();
+ } catch (IOException e) {
+ LOG.error("Close client error: {}", e.getMessage(), e);
+ } finally {
+ super.cleanup();
+ }
+ }
+
+ @FunctionalInterface
+ public interface MetricMapper extends Serializable {
+ GenericMetricEntity map(Map event);
+ }
+
+ public class StructuredMetricMapper implements MetricMapper {
+ private final MetricDefinition metricDefinition;
+
+ private StructuredMetricMapper(MetricDefinition metricDefinition) {
+ this.metricDefinition = metricDefinition;
+ }
+
+ @Override
+ public GenericMetricEntity map(Map event) {
+ String metricName = metricDefinition.getNameSelector().getMetricName(event);
+ Preconditions.checkNotNull(metricName,"Metric name is null");
+ Long timestamp = metricDefinition.getTimestampSelector().getTimestamp(event);
+ Preconditions.checkNotNull(timestamp, "Timestamp is null");
+ Map<String,String> tags = new HashMap<>();
+ for (String dimensionField: metricDefinition.getDimensionFields()) {
+ Preconditions.checkNotNull(dimensionField,"Dimension field name is null");
+ tags.put(dimensionField, (String) event.get(dimensionField));
+ }
+
+ double[] values;
+ if (event.containsKey(metricDefinition.getValueField())) {
+ values = new double[] {(double) event.get(metricDefinition.getValueField())};
+ } else {
+ LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDefinition.getValueField(), event);
+ values = new double[]{0};
+ }
+
+ GenericMetricEntity entity = new GenericMetricEntity();
+ entity.setPrefix(metricName);
+ entity.setTimestamp(timestamp);
+ entity.setTags(tags);
+ entity.setValue(values);
+ return entity;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java
new file mode 100644
index 0000000..ef6c8d6
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.topology.base.BaseRichBolt;
+import org.apache.eagle.app.utils.StreamConvertHelper;
+import org.apache.eagle.common.utils.Tuple2;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseRichBolt implements StreamSink<K> {
+ private static final Logger LOG = LoggerFactory.getLogger(StormStreamSink.class);
+ private String streamId;
+ private OutputCollector collector;
+
+ @Override
+ public void init(String streamId, K config) {
+ this.streamId = streamId;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ /**
+ * Implicitly hides the Tuple protocol inside code as Tuple[Key,Map].
+ */
+ @Override
+ public void execute(Tuple input) {
+ try {
+ Tuple2<Object,Map> keyValue = StreamConvertHelper.tupleToEvent(input);
+ execute(keyValue.f0(), keyValue.f1(), collector);
+ collector.ack(input);
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage(), ex);
+ collector.reportError(ex);
+ }
+ }
+
+ protected abstract void execute(Object key, Map event, OutputCollector collector) throws Exception;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java
new file mode 100644
index 0000000..b31de46
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.topology.base.BaseRichSpout;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+import org.apache.eagle.metadata.model.StreamSourceConfig;
+
+public abstract class StormStreamSource<T extends StreamSourceConfig> extends BaseRichSpout implements StreamSource<T> {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java
new file mode 100644
index 0000000..42ea36e
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import backtype.storm.tuple.Tuple;
+
+import java.io.Serializable;
+import java.util.List;
+
+@FunctionalInterface
+public interface StreamEventMapper extends Serializable {
+ /**
+ * Map from storm tuple to Stream Event.
+ *
+ * @param tuple
+ * @return
+ * @throws Exception
+ */
+ List<StreamEvent> map(Tuple tuple) throws Exception;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java
new file mode 100644
index 0000000..0dbc1a7
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+import org.apache.eagle.metadata.model.StreamSourceConfig;
+
+import java.lang.reflect.ParameterizedType;
+
+/**
+ * Stream Messaging Bus.
+ */
+public interface StreamProvider<W extends StreamSink<C>, C extends StreamSinkConfig,
+ R extends StreamSource<F>, F extends StreamSourceConfig> {
+
+ C getSinkConfig(String streamId, Config config);
+
+ W getSink();
+
+ default W getSink(String streamId, Config config) {
+ W s = getSink();
+ s.init(streamId, getSinkConfig(streamId, config));
+ return s;
+ }
+
+ F getSourceConfig(String streamId, Config config);
+
+ R getSource();
+
+ default R getSource(String streamId, Config config) {
+ R i = getSource();
+ i.init(streamId, getSourceConfig(streamId, config));
+ return i;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java
new file mode 100644
index 0000000..7ba4a9a
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.app.ApplicationLifecycle;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+public interface StreamSink<T extends StreamSinkConfig> extends ApplicationLifecycle {
+ void init(String streamId,T config);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java
new file mode 100644
index 0000000..af3965f
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.metadata.model.StreamSourceConfig;
+
+public interface StreamSource<T extends StreamSourceConfig> {
+ void init(String streamId, T config);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
index bd0adfe..25b5978 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
@@ -29,8 +29,9 @@ import org.apache.eagle.alert.metric.MetricConfigs;
import org.apache.eagle.app.Application;
import org.apache.eagle.app.environment.ExecutionRuntime;
import org.apache.eagle.app.environment.ExecutionRuntimeManager;
-import org.apache.eagle.app.sink.KafkaStreamSinkConfig;
+import org.apache.eagle.app.messaging.KafkaStreamSinkConfig;
import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.StreamSourceConfig;
import org.apache.eagle.metadata.utils.StreamIdConversions;
import org.apache.eagle.metadata.model.StreamDesc;
import org.apache.eagle.metadata.model.StreamSinkConfig;
@@ -101,21 +102,32 @@ public class ApplicationAction implements Serializable {
copied.setSiteId(metadata.getSite().getSiteId());
copied.setStreamId(StreamIdConversions.formatSiteStreamId(metadata.getSite().getSiteId(), copied.getStreamId()));
StreamSinkConfig streamSinkConfig = this.runtime.environment()
- .streamSink().getSinkConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), this.effectiveConfig);
+ .stream().getSinkConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), this.effectiveConfig);
+
+ StreamSourceConfig streamSourceConfig = null;
+
+ try {
+ streamSourceConfig = this.runtime.environment()
+ .stream().getSourceConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), this.effectiveConfig);
+ } catch (Throwable throwable) {
+ // Ignore source config if not set.
+ }
+
StreamDesc streamDesc = new StreamDesc();
streamDesc.setSchema(copied);
- streamDesc.setSink(streamSinkConfig);
+ streamDesc.setSinkConfig(streamSinkConfig);
+ streamDesc.setSourceConfig(streamSourceConfig);
streamDesc.setStreamId(copied.getStreamId());
+
return streamDesc;
})).collect(Collectors.toList());
metadata.setStreams(streamDescToInstall);
- // TODO: Decouple converting from StreamSink to Alert DataSource
// iterate each stream descriptor and create alert datasource for each
for (StreamDesc streamDesc : streamDescToInstall) {
// only take care of Kafka sink
- if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) {
- KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSink();
+ if (streamDesc.getSinkConfig() instanceof KafkaStreamSinkConfig) {
+ KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSinkConfig();
Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
datasource.setType("KAFKA");
datasource.setName(metadata.getAppId());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
index cbbd438..aa62251 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
@@ -31,7 +31,7 @@ import java.util.ServiceLoader;
import java.util.function.Function;
/**
- * Load Application Provider with SPI, by default from current class loader.
+ * Load Application KafkaStreamMessaging with SPI, by default from current class loader.
*/
public class ApplicationProviderSPILoader extends ApplicationProviderLoader {
private final String appProviderExtDir;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkConfig.java
deleted file mode 100644
index b3d984e..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkConfig.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-public class DefaultStreamSinkConfig implements StreamSinkConfig {
- private final Class<?> streamPersistClass;
- private static final String NONE_STORAGE_TYPE = "NONE";
-
- public DefaultStreamSinkConfig(Class<?> streamPersistClass) {
- this.streamPersistClass = streamPersistClass;
- }
-
- @Override
- public String getType() {
- return NONE_STORAGE_TYPE;
- }
-
- public Class<?> getSinkType() {
- return streamPersistClass;
- }
-
- @Override
- public Class<? extends StreamSinkConfig> getConfigType() {
- return DefaultStreamSinkConfig.class;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/FlattenEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/FlattenEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/FlattenEventMapper.java
deleted file mode 100644
index 3649d72..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/FlattenEventMapper.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import backtype.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.List;
-
-public class FlattenEventMapper implements StreamEventMapper {
- private final String streamId;
- private static final String TIMESTAMP_FIELD = "timestamp";
- private static final Logger LOGGER = LoggerFactory.getLogger(FlattenEventMapper.class);
-
- public FlattenEventMapper(String streamId) {
- this.streamId = streamId;
- }
-
- @Override
- public List<StreamEvent> map(Tuple tuple) throws Exception {
- long timestamp;
- if (tuple.getFields().contains(TIMESTAMP_FIELD)) {
- try {
- timestamp = tuple.getLongByField("timestamp");
- } catch (Exception ex) {
- // if timestamp is not null
- LOGGER.error(ex.getMessage(), ex);
- timestamp = 0;
- }
- } else {
- timestamp = System.currentTimeMillis();
- }
- Object[] values = new Object[tuple.getFields().size()];
- for (int i = 0; i < tuple.getFields().size(); i++) {
- values[i] = tuple.getValue(i);
- }
- StreamEvent event = new StreamEvent();
- event.setTimestamp(timestamp);
- event.setStreamId(streamId);
- event.setData(values);
- return Collections.singletonList(event);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
deleted file mode 100644
index 12acb81..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamSink.class);
- private String topicId;
- private Producer producer;
- private KafkaStreamSinkConfig config;
-
- @Override
- public void init(String streamId, KafkaStreamSinkConfig config) {
- super.init(streamId, config);
- this.topicId = config.getTopicId();
- this.config = config;
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- super.prepare(stormConf, context, collector);
- Properties properties = new Properties();
- properties.put("metadata.broker.list", config.getBrokerList());
- properties.put("serializer.class", config.getSerializerClass());
- properties.put("key.serializer.class", config.getKeySerializerClass());
- // new added properties for async producer
- properties.put("producer.type", config.getProducerType());
- properties.put("batch.num.messages", config.getNumBatchMessages());
- properties.put("request.required.acks", config.getRequestRequiredAcks());
- properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs());
- ProducerConfig producerConfig = new ProducerConfig(properties);
- producer = new Producer(producerConfig);
- }
-
- @Override
- protected void execute(Object key, Map event, OutputCollector collector) throws Exception {
- try {
- String output = new ObjectMapper().writeValueAsString(event);
- // partition key may cause data skew
- //producer.send(new KeyedMessage(this.topicId, key, output));
- producer.send(new KeyedMessage(this.topicId, output));
- } catch (Exception ex) {
- LOG.error(ex.getMessage(), ex);
- throw ex;
- }
- }
-
- @Override
- public void afterInstall() {
- ensureTopicCreated();
- }
-
- private void ensureTopicCreated() {
- LOG.info("TODO: ensure kafka topic {} created", this.topicId);
- }
-
- private void ensureTopicDeleted() {
- LOG.info("TODO: ensure kafka topic {} deleted", this.topicId);
- }
-
- @Override
- public void cleanup() {
- if (this.producer != null) {
- this.producer.close();
- }
- }
-
- @Override
- public void afterUninstall() {
- ensureTopicDeleted();
- }
-
- public static class Provider implements StreamSinkProvider<KafkaStreamSink, KafkaStreamSinkConfig> {
- private static final Logger LOG = LoggerFactory.getLogger(Provider.class);
- private static final String DEAULT_SHARED_TOPIC_CONF_KEY = "dataSinkConfig.topic";
-
- private String getStreamSpecificTopicConfigKey(String streamId) {
- return String.format("dataSinkConfig.%s.topic",streamId);
- }
-
- @Override
- public KafkaStreamSinkConfig getSinkConfig(String streamId, Config config) {
- KafkaStreamSinkConfig desc = new KafkaStreamSinkConfig();
- String streamSpecificTopicConfigKey = getStreamSpecificTopicConfigKey(streamId);
- if (config.hasPath(streamSpecificTopicConfigKey)) {
- desc.setTopicId(config.getString(streamSpecificTopicConfigKey));
- } else if (config.hasPath(DEAULT_SHARED_TOPIC_CONF_KEY)) {
- desc.setTopicId(config.getString(DEAULT_SHARED_TOPIC_CONF_KEY));
- LOG.warn("Using default shared topic {}: {}", DEAULT_SHARED_TOPIC_CONF_KEY, desc.getTopicId());
- } else {
- LOG.error("Neither stream specific topic: {} nor default shared topic: {} found in config", streamSpecificTopicConfigKey, DEAULT_SHARED_TOPIC_CONF_KEY);
- throw new IllegalArgumentException("Neither stream specific topic: "
- + streamSpecificTopicConfigKey + " nor default shared topic: " + DEAULT_SHARED_TOPIC_CONF_KEY + " found in config");
- }
- desc.setBrokerList(config.getString("dataSinkConfig.brokerList"));
- desc.setSerializerClass(config.hasPath("dataSinkConfig.serializerClass")
- ? config.getString("dataSinkConfig.serializerClass") : "kafka.serializer.StringEncoder");
- desc.setKeySerializerClass(config.hasPath("dataSinkConfig.keySerializerClass")
- ? config.getString("dataSinkConfig.keySerializerClass") : "kafka.serializer.StringEncoder");
-
- // new added properties for async producer
- desc.setNumBatchMessages(config.hasPath("dataSinkConfig.numBatchMessages")
- ? config.getString("dataSinkConfig.numBatchMessages") : "1024");
- desc.setProducerType(config.hasPath("dataSinkConfig.producerType")
- ? config.getString("dataSinkConfig.producerType") : "async");
- desc.setMaxQueueBufferMs(config.hasPath("dataSinkConfig.maxQueueBufferMs")
- ? config.getString("dataSinkConfig.maxQueueBufferMs") : "3000");
- desc.setRequestRequiredAcks(config.hasPath("dataSinkConfig.requestRequiredAcks")
- ? config.getString("dataSinkConfig.requestRequiredAcks") : "1");
- return desc;
- }
-
- @Override
- public KafkaStreamSink getSink() {
- return new KafkaStreamSink();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
deleted file mode 100644
index d5479df..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-public class KafkaStreamSinkConfig implements StreamSinkConfig {
- private String topicId;
- private String brokerList;
- private String serializerClass;
- private String keySerializerClass;
- private String numBatchMessages;
- private String maxQueueBufferMs;
- private String producerType;
- private String requestRequiredAcks;
-
- public String getTopicId() {
- return topicId;
- }
-
- public void setTopicId(String topicId) {
- this.topicId = topicId;
- }
-
- public String getBrokerList() {
- return brokerList;
- }
-
- public void setBrokerList(String brokerList) {
- this.brokerList = brokerList;
- }
-
- public String getSerializerClass() {
- return serializerClass;
- }
-
- public void setSerializerClass(String serializerClass) {
- this.serializerClass = serializerClass;
- }
-
- public String getKeySerializerClass() {
- return keySerializerClass;
- }
-
- public void setKeySerializerClass(String keySerializerClass) {
- this.keySerializerClass = keySerializerClass;
- }
-
- public String getNumBatchMessages() {
- return numBatchMessages;
- }
-
- public void setNumBatchMessages(String numBatchMessages) {
- this.numBatchMessages = numBatchMessages;
- }
-
- public String getMaxQueueBufferMs() {
- return maxQueueBufferMs;
- }
-
- public void setMaxQueueBufferMs(String maxQueueBufferMs) {
- this.maxQueueBufferMs = maxQueueBufferMs;
- }
-
- public String getProducerType() {
- return producerType;
- }
-
- public void setProducerType(String producerType) {
- this.producerType = producerType;
- }
-
- public String getRequestRequiredAcks() {
- return requestRequiredAcks;
- }
-
- public void setRequestRequiredAcks(String requestRequiredAcks) {
- this.requestRequiredAcks = requestRequiredAcks;
- }
-
- @Override
- public String getType() {
- return "KAFKA";
- }
-
- @Override
- public Class<?> getSinkType() {
- return KafkaStreamSink.class;
- }
-
- @Override
- public Class<? extends StreamSinkConfig> getConfigType() {
- return KafkaStreamSinkConfig.class;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
deleted file mode 100644
index 3a02caf..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import backtype.storm.task.OutputCollector;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class LoggingStreamSink extends StormStreamSink<DefaultStreamSinkConfig> {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class);
-
- @Override
- public void afterInstall() {
- LOGGER.info("Executing afterInstall callback, do nothing");
- }
-
- @Override
- public void afterUninstall() {
- LOGGER.info("Executing afterUninstall callback, do nothing");
- }
-
- @Override
- protected void execute(Object key, Map event, OutputCollector collector) throws Exception {
- LOGGER.info("Receiving {}", event);
- }
-
- public static class Provider implements StreamSinkProvider<LoggingStreamSink, DefaultStreamSinkConfig> {
- @Override
- public DefaultStreamSinkConfig getSinkConfig(String streamId, Config config) {
- return new DefaultStreamSinkConfig(LoggingStreamSink.class);
- }
-
- @Override
- public LoggingStreamSink getSink() {
- return new LoggingStreamSink();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
deleted file mode 100644
index ad40772..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.topology.base.BaseRichBolt;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseRichBolt implements StreamSink<K> {
- private static final Logger LOG = LoggerFactory.getLogger(StormStreamSink.class);
- private String streamId;
- private OutputCollector collector;
-
- @Override
- public void init(String streamId, K config) {
- this.streamId = streamId;
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- /**
- * Implicitly hides the Tuple protocol inside code as Tuple[Key,Map].
- */
- @Override
- public void execute(Tuple input) {
- try {
- Map event = null;
- Object key = input.getValue(0);
- if (input.size() < 2) {
- event = tupleAsMap(input);
- } else {
- Object value = input.getValue(1);
- if (value != null) {
- if (value instanceof Map) {
- event = (Map) input.getValue(1);
- } else {
- event = tupleAsMap(input);
- }
- }
- }
- execute(key, event, collector);
- collector.ack(input);
- } catch (Exception ex) {
- LOG.error(ex.getMessage(), ex);
- collector.reportError(ex);
- }
- }
-
- protected abstract void execute(Object key, Map event, OutputCollector collector) throws Exception;
-
- private Map tupleAsMap(Tuple tuple) {
- Map values = new HashMap<>();
- for (String field : tuple.getFields()) {
- values.put(field, tuple.getValueByField(field));
- }
- return values;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- public String getStreamId() {
- return streamId;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamEventMapper.java
deleted file mode 100644
index 710e4d2..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamEventMapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import backtype.storm.tuple.Tuple;
-
-import java.io.Serializable;
-import java.util.List;
-
-@FunctionalInterface
-public interface StreamEventMapper extends Serializable {
- /**
- * Map from storm tuple to Stream Event.
- *
- * @param tuple
- * @return
- * @throws Exception
- */
- List<StreamEvent> map(Tuple tuple) throws Exception;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
deleted file mode 100644
index ee2790c..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.app.ApplicationLifecycle;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-public interface StreamSink<T extends StreamSinkConfig> extends ApplicationLifecycle {
- void init(String streamId,T config);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
deleted file mode 100644
index 0425fa6..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-import com.typesafe.config.Config;
-
-import java.lang.reflect.ParameterizedType;
-
-public interface StreamSinkProvider<S extends StreamSink<D>, D extends StreamSinkConfig> {
- D getSinkConfig(String streamId, Config config);
-
- S getSink();
-
- default S getSink(String streamId, Config config) {
- S s = getSink();
- s.init(streamId, getSinkConfig(streamId, config));
- return s;
- }
-
- default Class<? extends S> getSinkType() {
- return (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
- }
-
- default Class<? extends D> getSinkConfigType() {
- return (Class<D>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
index 143e026..361b4c6 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
@@ -28,7 +28,7 @@ import java.lang.reflect.ParameterizedType;
import java.util.Optional;
/**
- * Application Service Provider Interface.
+ * Application Service KafkaStreamMessaging Interface.
*
* @param <T> Application Type.
*/
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/StreamConvertHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/StreamConvertHelper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/StreamConvertHelper.java
new file mode 100644
index 0000000..c25a93b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/StreamConvertHelper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.utils;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.common.utils.Tuple2;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class StreamConvertHelper {
+ private static Map tupleToMap(Tuple tuple) {
+ Map values = new HashMap<>();
+ for (String field : tuple.getFields()) {
+ values.put(field, tuple.getValueByField(field));
+ }
+ return values;
+ }
+
+ public static Tuple2<Object,Map> tupleToEvent(Tuple input) {
+ Map event = null;
+ Object key = input.getValue(0);
+ if (input.size() < 2) {
+ event = StreamConvertHelper.tupleToMap(input);
+ } else {
+ Object value = input.getValue(1);
+ if (value != null) {
+ if (value instanceof Map) {
+ event = (Map) input.getValue(1);
+ } else {
+ event = StreamConvertHelper.tupleToMap(input);
+ }
+ }
+ }
+ return new Tuple2<>(key, event);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
index a47e30a..7977650 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
@@ -23,13 +23,10 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
-import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.typesafe.config.Config;
import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.common.module.ModuleRegistry;
import org.apache.eagle.app.spi.AbstractApplicationProvider;
-import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
import org.junit.Ignore;
import java.util.Arrays;
@@ -61,7 +58,7 @@ public class TestStormApplication extends StormApplication{
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+ outputFieldsDeclarer.declare(new Fields("metric","timestamp","fromStream","value"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/StaticEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/StaticEnvironmentTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/StaticEnvironmentTest.java
index de64e6a..906bf66 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/StaticEnvironmentTest.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/StaticEnvironmentTest.java
@@ -34,6 +34,6 @@ public class StaticEnvironmentTest {
*/
@Test
public void testStreamSinkSupport(){
- environment.streamSink();
+ environment.stream();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
index 5733531..eda9383 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
@@ -54,7 +54,7 @@ public class MockStormApplication extends StormApplication {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+ outputFieldsDeclarer.declare(new Fields("metric","timestamp","fromStream","value"));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
new file mode 100644
index 0000000..b1a4193
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.stream;
+
+import org.apache.eagle.app.environment.builder.CEPFunction;
+import org.apache.eagle.app.environment.builder.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CEPFunctionTest {
+ @Test @Ignore("TODO: Not implement yet")
+ public void testSiddhiFunction() {
+ CEPFunction function = new CEPFunction(
+ "define stream inputStream (name string, value double);\n "
+ + "from inputStream#window.timeBatch( 1 min ) \n" +
+ "select name, avg(value) as avgValue\n" +
+ "group by name \n" +
+ "insert into outputStream ",
+ "inputStream","outputStream");
+ Collector collector = new Collector() {
+ @Override
+ public void collect(Object key, Map event) {
+
+ }
+ };
+ function.open(collector);
+ function.transform(new HashMap<String,Object>() {{
+ put("name","cpu.usage");
+ put("value", 0.98);
+ }});
+ function.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
index 4dbf0c4..377eed1 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
@@ -43,9 +43,8 @@
"store": "org.apache.eagle.metadata.service.memory.MemoryMetadataStore"
},
"application":{
- "sink":{
- "type": "org.apache.eagle.app.sink.KafkaStreamSink"
- "boostrap.server":"localhost:9092"
+ "stream": {
+ "provider": "org.apache.eagle.app.messaging.KafkaStreamProvider"
}
"provider":{
"loader": "org.apache.eagle.app.service.impl.ApplicationProviderSPILoader"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/Tuple2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/Tuple2.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/Tuple2.java
new file mode 100644
index 0000000..ac3dc7c
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/Tuple2.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.utils;
+
+import java.io.Serializable;
+
+public class Tuple2<F1, F2> implements Serializable {
+ private final F1 f1;
+ private final F2 f2;
+
+ public Tuple2(F1 f1, F2 f2) {
+ this.f1 = f1;
+ this.f2 = f2;
+ }
+
+ public F2 f1() {
+ return f2;
+ }
+
+ public F1 f0() {
+ return f1;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("(%s,%s)", this.f1, this.f2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
index 47d16fd..a183351 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
@@ -19,10 +19,13 @@ package org.apache.eagle.metadata.model;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.metadata.utils.StreamIdConversions;
+import javax.xml.transform.stream.StreamSource;
+
public class StreamDesc {
private String streamId;
private StreamDefinition schema;
- private StreamSinkConfig sink;
+ private StreamSinkConfig sinkConfig;
+ private StreamSourceConfig sourceConfig;
public String getStreamId() {
return streamId;
@@ -40,11 +43,19 @@ public class StreamDesc {
this.schema = streamSchema;
}
- public StreamSinkConfig getSink() {
- return sink;
+ public StreamSinkConfig getSinkConfig() {
+ return sinkConfig;
+ }
+
+ public void setSinkConfig(StreamSinkConfig sinkDesc) {
+ this.sinkConfig = sinkDesc;
+ }
+
+ public StreamSourceConfig getSourceConfig() {
+ return sourceConfig;
}
- public void setSink(StreamSinkConfig sinkDesc) {
- this.sink = sinkDesc;
+ public void setSourceConfig(StreamSourceConfig sourceConfig) {
+ this.sourceConfig = sourceConfig;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSourceConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSourceConfig.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSourceConfig.java
new file mode 100644
index 0000000..4034221
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSourceConfig.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metadata.model;
+
+import java.io.Serializable;
+
+public interface StreamSourceConfig extends Serializable {
+ String getType();
+
+ Class<?> getSourceType();
+
+ Class<? extends StreamSourceConfig> getConfigType();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-metadata/eagle-metadata-base/src/test/resources/application-test.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/test/resources/application-test.xml b/eagle-core/eagle-metadata/eagle-metadata-base/src/test/resources/application-test.xml
index 82e84ed..d408c5e 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/test/resources/application-test.xml
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/test/resources/application-test.xml
@@ -1,3 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
<configuration>
<property>
<name>testkey1</name>