You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/30 01:38:28 UTC

[4/5] incubator-eagle git commit: [EAGLE-806] Integrate Metric Process and Persistence with Application Framework

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
new file mode 100644
index 0000000..4a72709
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+/**
+ * FIXME Rename to KafkaStreamMessagingConfig.
+ */
+public class KafkaStreamSinkConfig implements StreamSinkConfig {
+    // Write Config
+    private String topicId;
+    private String brokerList;
+    private String serializerClass;
+    private String keySerializerClass;
+    private String numBatchMessages;
+    private String maxQueueBufferMs;
+    private String producerType;
+    private String requestRequiredAcks;
+
+    public String getTopicId() {
+        return topicId;
+    }
+
+    public void setTopicId(String topicId) {
+        this.topicId = topicId;
+    }
+
+    public String getBrokerList() {
+        return brokerList;
+    }
+
+    public void setBrokerList(String brokerList) {
+        this.brokerList = brokerList;
+    }
+
+    public String getSerializerClass() {
+        return serializerClass;
+    }
+
+    public void setSerializerClass(String serializerClass) {
+        this.serializerClass = serializerClass;
+    }
+
+    public String getKeySerializerClass() {
+        return keySerializerClass;
+    }
+
+    public void setKeySerializerClass(String keySerializerClass) {
+        this.keySerializerClass = keySerializerClass;
+    }
+
+    public String getNumBatchMessages() {
+        return numBatchMessages;
+    }
+
+    public void setNumBatchMessages(String numBatchMessages) {
+        this.numBatchMessages = numBatchMessages;
+    }
+
+    public String getMaxQueueBufferMs() {
+        return maxQueueBufferMs;
+    }
+
+    public void setMaxQueueBufferMs(String maxQueueBufferMs) {
+        this.maxQueueBufferMs = maxQueueBufferMs;
+    }
+
+    public String getProducerType() {
+        return producerType;
+    }
+
+    public void setProducerType(String producerType) {
+        this.producerType = producerType;
+    }
+
+    public String getRequestRequiredAcks() {
+        return requestRequiredAcks;
+    }
+
+    public void setRequestRequiredAcks(String requestRequiredAcks) {
+        this.requestRequiredAcks = requestRequiredAcks;
+    }
+
+    @Override
+    public String getType() {
+        return "KAFKA";
+    }
+
+    @Override
+    public Class<?> getSinkType() {
+        return KafkaStreamSink.class;
+    }
+
+    @Override
+    public Class<? extends StreamSinkConfig> getConfigType() {
+        return KafkaStreamSinkConfig.class;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java
new file mode 100644
index 0000000..9edf644
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import com.google.common.base.Preconditions;
+import org.apache.eagle.alert.engine.spout.SchemeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.*;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class KafkaStreamSource extends StormStreamSource<KafkaStreamSourceConfig> {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamSource.class);
+    private KafkaSpout spout;
+
+    @Override
+    public void init(String streamId, KafkaStreamSourceConfig config) {
+        this.spout = createKafkaSpout(config);
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.spout.open(conf, context, collector);
+    }
+
+    @Override
+    public void nextTuple() {
+        this.spout.nextTuple();
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        this.spout.declareOutputFields(declarer);
+    }
+
+    @Override
+    public void close() {
+        this.spout.close();
+    }
+
+    @Override
+    public void activate() {
+        this.spout.activate();
+    }
+
+    @Override
+    public void deactivate() {
+        this.spout.deactivate();
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        this.spout.ack(msgId);
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        this.spout.fail(msgId);
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return this.spout.getComponentConfiguration();
+    }
+
+    // ----------------
+    //  Helper Methods
+    // ----------------
+
+    private static KafkaSpout createKafkaSpout(KafkaStreamSourceConfig config) {
+
+        // the following is for fetching data from one topic
+        // Kafka topic
+        String topic = config.getTopicId();
+        // Kafka broker zk connection
+        String zkConnString = config.getBrokerZkQuorum();
+        // Kafka fetch size
+        int fetchSize = config.getFetchSize();
+        LOG.info(String.format("Use topic : %s, zkQuorum : %s , fetchSize : %d", topic, zkConnString, fetchSize));
+
+        /*
+         the following is for recording offset for processing the data
+         the zk path to store current offset is comprised of the following
+         offset zkPath = zkRoot + "/" + topic + "/" + consumerGroupId + "/" + partition_Id
+
+         consumerGroupId is for differentiating different consumers which consume the same topic
+        */
+        // transaction zkRoot
+        String zkRoot = config.getTransactionZKRoot();
+        // Kafka consumer group id
+        String groupId = config.getConsumerGroupId();
+        String brokerZkPath = config.getBrokerZkPath();
+
+        BrokerHosts hosts;
+        if (brokerZkPath == null) {
+            hosts = new ZkHosts(zkConnString);
+        } else {
+            hosts = new ZkHosts(zkConnString, brokerZkPath);
+        }
+
+        SpoutConfig spoutConfig = new SpoutConfig(hosts,
+            topic,
+            zkRoot + "/" + topic,
+            groupId);
+
+        // transaction zkServers to store kafka consumer offset. Default to use storm zookeeper
+        if (config.getTransactionZkServers() != null) {
+            String[] txZkServers = config.getTransactionZkServers().split(",");
+            spoutConfig.zkServers = Arrays.asList(txZkServers).stream().map(server -> server.split(":")[0]).collect(Collectors.toList());
+            spoutConfig.zkPort = Integer.parseInt(txZkServers[0].split(":")[1]);
+            LOG.info("txZkServers:" + spoutConfig.zkServers + ", zkPort:" + spoutConfig.zkPort);
+        }
+
+        // transaction update interval
+        spoutConfig.stateUpdateIntervalMs = config.getTransactionStateUpdateMS();
+        // Kafka fetch size
+        spoutConfig.fetchSizeBytes = fetchSize;
+        spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
+
+        // "startOffsetTime" is for test usage, prod should not use this
+        if (config.getStartOffsetTime() >= 0) {
+            spoutConfig.startOffsetTime = config.getStartOffsetTime();
+        }
+        // "forceFromStart" is for test usage, prod should not use this
+        if (config.isForceFromStart()) {
+            spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+        }
+
+        Preconditions.checkNotNull(config.getSchemaClass(), "schemaClass is null");
+        try {
+            Scheme s = config.getSchemaClass().newInstance();
+            spoutConfig.scheme = new SchemeAsMultiScheme(s);
+        } catch (Exception ex) {
+            LOG.error("Error instantiating scheme object");
+            throw new IllegalStateException(ex);
+        }
+        return new KafkaSpout(spoutConfig);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
new file mode 100644
index 0000000..5a9c162
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.metadata.model.StreamSourceConfig;
+
+public class KafkaStreamSourceConfig implements StreamSourceConfig {
+
+    private static final String DEFAULT_CONFIG_PREFIX = "dataSourceConfig";
+    private static final String DEFAULT_CONSUMER_GROUP_ID = "eagleKafkaSource";
+    private static final String DEFAULT_TRANSACTION_ZK_ROOT = "/consumers";
+    private static final Class<? extends backtype.storm.spout.Scheme> DEFAULT_KAFKA_SCHEMA = JsonSchema.class;
+
+    // Read Config
+    private String topicId;
+    private String brokerZkQuorum;
+    private String brokerZkBasePath;
+    private String transactionZkServers;
+
+    private int fetchSize = 1048576;
+    private String transactionZKRoot = DEFAULT_TRANSACTION_ZK_ROOT;
+    private String consumerGroupId = DEFAULT_CONSUMER_GROUP_ID;
+    private String brokerZkPath = null;
+    private long transactionStateUpdateMS = 2000;
+    private int startOffsetTime = -1;
+    private boolean forceFromStart = false;
+    private Class<? extends backtype.storm.spout.Scheme> schemaClass = DEFAULT_KAFKA_SCHEMA;
+
+    public String getBrokerZkQuorum() {
+        return brokerZkQuorum;
+    }
+
+    public void setBrokerZkQuorum(String brokerZkQuorum) {
+        this.brokerZkQuorum = brokerZkQuorum;
+    }
+
+    public String getBrokerZkBasePath() {
+        return brokerZkBasePath;
+    }
+
+    public void setBrokerZkBasePath(String brokerZkBasePath) {
+        this.brokerZkBasePath = brokerZkBasePath;
+    }
+
+    public int getFetchSize() {
+        return fetchSize;
+    }
+
+    public void setFetchSize(int fetchSize) {
+        this.fetchSize = fetchSize;
+    }
+
+    public String getTransactionZKRoot() {
+        return transactionZKRoot;
+    }
+
+    public void setTransactionZKRoot(String transactionZKRoot) {
+        this.transactionZKRoot = transactionZKRoot;
+    }
+
+    public String getConsumerGroupId() {
+        return consumerGroupId;
+    }
+
+    public void setConsumerGroupId(String consumerGroupId) {
+        this.consumerGroupId = consumerGroupId;
+    }
+
+    public String getBrokerZkPath() {
+        return brokerZkPath;
+    }
+
+    public void setBrokerZkPath(String brokerZkPath) {
+        this.brokerZkPath = brokerZkPath;
+    }
+
+    public String getTransactionZkServers() {
+        return transactionZkServers;
+    }
+
+    public void setTransactionZkServers(String transactionZkServers) {
+        this.transactionZkServers = transactionZkServers;
+    }
+
+    public long getTransactionStateUpdateMS() {
+        return transactionStateUpdateMS;
+    }
+
+    public void setTransactionStateUpdateMS(long transactionStateUpdateMS) {
+        this.transactionStateUpdateMS = transactionStateUpdateMS;
+    }
+
+    public int getStartOffsetTime() {
+        return startOffsetTime;
+    }
+
+    public void setStartOffsetTime(int startOffsetTime) {
+        this.startOffsetTime = startOffsetTime;
+    }
+
+    public boolean isForceFromStart() {
+        return forceFromStart;
+    }
+
+    public void setForceFromStart(boolean forceFromStart) {
+        this.forceFromStart = forceFromStart;
+    }
+
+    public Class<? extends backtype.storm.spout.Scheme> getSchemaClass() {
+        return schemaClass;
+    }
+
+    public void setSchemaClass(Class<? extends backtype.storm.spout.Scheme> schemaClass) {
+        this.schemaClass = schemaClass;
+    }
+
+    @Override
+    public String getType() {
+        return "KAFKA";
+    }
+
+    @Override
+    public Class<?> getSourceType() {
+        return KafkaStreamSource.class;
+    }
+
+    @Override
+    public Class<? extends StreamSourceConfig> getConfigType() {
+        return StreamSourceConfig.class;
+    }
+
+    public String getTopicId() {
+        return topicId;
+    }
+
+    public void setTopicId(String topicId) {
+        this.topicId = topicId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
new file mode 100644
index 0000000..32b168b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.utils.StreamConvertHelper;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.BatchSender;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricStreamPersist extends BaseRichBolt  {
+    private static final Logger LOG = LoggerFactory.getLogger(MetricStreamPersist.class);
+
+    private final Config config;
+    private final MetricMapper mapper;
+    private final int batchSize;
+    private IEagleServiceClient client;
+    private OutputCollector collector;
+    private BatchSender batchSender;
+
+    public MetricStreamPersist(MetricDefinition metricDefinition, Config config) {
+        this.config = config;
+        this.mapper = new StructuredMetricMapper(metricDefinition);
+        this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1;
+    }
+
+    public MetricStreamPersist(MetricMapper mapper, Config config) {
+        this.config = config;
+        this.mapper = mapper;
+        this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.client = new EagleServiceClientImpl(config);
+        if (this.batchSize > 0) {
+            this.batchSender = client.batch(this.batchSize);
+        }
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        try {
+            GenericMetricEntity metricEntity = this.mapper.map(StreamConvertHelper.tupleToEvent(input).f1());
+            if (batchSize <= 1) {
+                GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(metricEntity));
+                if (!response.isSuccess()) {
+                    LOG.error("Service side error: {}", response.getException());
+                    collector.reportError(new IllegalStateException(response.getException()));
+                } else {
+                    collector.ack(input);
+                }
+            } else {
+                this.batchSender.send(metricEntity);
+                collector.ack(input);
+            }
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
+            collector.reportError(ex);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("f1"));
+    }
+
+    @Override
+    public void cleanup() {
+        try {
+            this.client.close();
+        } catch (IOException e) {
+            LOG.error("Close client error: {}", e.getMessage(), e);
+        } finally {
+            super.cleanup();
+        }
+    }
+
+    @FunctionalInterface
+    public interface MetricMapper extends Serializable {
+        GenericMetricEntity map(Map event);
+    }
+
+    public class StructuredMetricMapper implements MetricMapper {
+        private final MetricDefinition metricDefinition;
+
+        private StructuredMetricMapper(MetricDefinition metricDefinition) {
+            this.metricDefinition = metricDefinition;
+        }
+
+        @Override
+        public GenericMetricEntity map(Map event) {
+            String metricName = metricDefinition.getNameSelector().getMetricName(event);
+            Preconditions.checkNotNull(metricName,"Metric name is null");
+            Long timestamp = metricDefinition.getTimestampSelector().getTimestamp(event);
+            Preconditions.checkNotNull(timestamp, "Timestamp is null");
+            Map<String,String> tags = new HashMap<>();
+            for (String dimensionField: metricDefinition.getDimensionFields()) {
+                Preconditions.checkNotNull(dimensionField,"Dimension field name is null");
+                tags.put(dimensionField, (String) event.get(dimensionField));
+            }
+
+            double[] values;
+            if (event.containsKey(metricDefinition.getValueField())) {
+                values = new double[] {(double) event.get(metricDefinition.getValueField())};
+            } else {
+                LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDefinition.getValueField(), event);
+                values = new double[]{0};
+            }
+
+            GenericMetricEntity entity = new GenericMetricEntity();
+            entity.setPrefix(metricName);
+            entity.setTimestamp(timestamp);
+            entity.setTags(tags);
+            entity.setValue(values);
+            return entity;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java
new file mode 100644
index 0000000..ef6c8d6
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.topology.base.BaseRichBolt;
+import org.apache.eagle.app.utils.StreamConvertHelper;
+import org.apache.eagle.common.utils.Tuple2;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseRichBolt implements StreamSink<K> {
+    private static final Logger LOG = LoggerFactory.getLogger(StormStreamSink.class);
+    private String streamId;
+    private OutputCollector collector;
+
+    @Override
+    public void init(String streamId, K config) {
+        this.streamId = streamId;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    /**
+     * Implicitly hides the Tuple protocol inside code as Tuple[Key,Map].
+     */
+    @Override
+    public void execute(Tuple input) {
+        try {
+            Tuple2<Object,Map> keyValue = StreamConvertHelper.tupleToEvent(input);
+            execute(keyValue.f0(), keyValue.f1(), collector);
+            collector.ack(input);
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
+            collector.reportError(ex);
+        }
+    }
+
+    protected abstract void execute(Object key, Map event, OutputCollector collector) throws Exception;
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java
new file mode 100644
index 0000000..b31de46
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.topology.base.BaseRichSpout;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+import org.apache.eagle.metadata.model.StreamSourceConfig;
+
+public abstract class StormStreamSource<T extends StreamSourceConfig> extends BaseRichSpout implements StreamSource<T> {
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java
new file mode 100644
index 0000000..42ea36e
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import backtype.storm.tuple.Tuple;
+
+import java.io.Serializable;
+import java.util.List;
+
+@FunctionalInterface
+public interface StreamEventMapper extends Serializable {
+    /**
+     * Map from storm tuple to Stream Event.
+     *
+     * @param tuple
+     * @return
+     * @throws Exception
+     */
+    List<StreamEvent> map(Tuple tuple) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java
new file mode 100644
index 0000000..0dbc1a7
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+import org.apache.eagle.metadata.model.StreamSourceConfig;
+
+import java.lang.reflect.ParameterizedType;
+
+/**
+ * Stream Messaging Bus.
+ */
+public interface StreamProvider<W extends StreamSink<C>, C extends StreamSinkConfig,
+        R extends StreamSource<F>, F extends StreamSourceConfig> {
+
+    C getSinkConfig(String streamId, Config config);
+
+    W getSink();
+
+    default W getSink(String streamId, Config config) {
+        W s = getSink();
+        s.init(streamId, getSinkConfig(streamId, config));
+        return s;
+    }
+
+    F getSourceConfig(String streamId, Config config);
+
+    R getSource();
+
+    default R getSource(String streamId, Config config) {
+        R i = getSource();
+        i.init(streamId, getSourceConfig(streamId, config));
+        return i;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java
new file mode 100644
index 0000000..7ba4a9a
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.app.ApplicationLifecycle;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+public interface StreamSink<T extends StreamSinkConfig> extends ApplicationLifecycle {
+    void init(String streamId,T config);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java
new file mode 100644
index 0000000..af3965f
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import org.apache.eagle.metadata.model.StreamSourceConfig;
+
+public interface StreamSource<T extends StreamSourceConfig> {
+    void init(String streamId, T config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
index bd0adfe..25b5978 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
@@ -29,8 +29,9 @@ import org.apache.eagle.alert.metric.MetricConfigs;
 import org.apache.eagle.app.Application;
 import org.apache.eagle.app.environment.ExecutionRuntime;
 import org.apache.eagle.app.environment.ExecutionRuntimeManager;
-import org.apache.eagle.app.sink.KafkaStreamSinkConfig;
+import org.apache.eagle.app.messaging.KafkaStreamSinkConfig;
 import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.StreamSourceConfig;
 import org.apache.eagle.metadata.utils.StreamIdConversions;
 import org.apache.eagle.metadata.model.StreamDesc;
 import org.apache.eagle.metadata.model.StreamSinkConfig;
@@ -101,21 +102,32 @@ public class ApplicationAction implements Serializable {
             copied.setSiteId(metadata.getSite().getSiteId());
             copied.setStreamId(StreamIdConversions.formatSiteStreamId(metadata.getSite().getSiteId(), copied.getStreamId()));
             StreamSinkConfig streamSinkConfig = this.runtime.environment()
-                    .streamSink().getSinkConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), this.effectiveConfig);
+                    .stream().getSinkConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), this.effectiveConfig);
+
+            StreamSourceConfig streamSourceConfig = null;
+
+            try {
+                streamSourceConfig = this.runtime.environment()
+                    .stream().getSourceConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), this.effectiveConfig);
+            } catch (Throwable throwable) {
+                // Ignore source config if not set.
+            }
+
             StreamDesc streamDesc = new StreamDesc();
             streamDesc.setSchema(copied);
-            streamDesc.setSink(streamSinkConfig);
+            streamDesc.setSinkConfig(streamSinkConfig);
+            streamDesc.setSourceConfig(streamSourceConfig);
             streamDesc.setStreamId(copied.getStreamId());
+
             return streamDesc;
         })).collect(Collectors.toList());
         metadata.setStreams(streamDescToInstall);
 
-        // TODO: Decouple converting from StreamSink to Alert DataSource
         // iterate each stream descriptor and create alert datasource for each
         for (StreamDesc streamDesc : streamDescToInstall) {
             // only take care of Kafka sink
-            if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) {
-                KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSink();
+            if (streamDesc.getSinkConfig() instanceof KafkaStreamSinkConfig) {
+                KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSinkConfig();
                 Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
                 datasource.setType("KAFKA");
                 datasource.setName(metadata.getAppId());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
index cbbd438..aa62251 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
@@ -31,7 +31,7 @@ import java.util.ServiceLoader;
 import java.util.function.Function;
 
 /**
- * Load Application Provider with SPI, by default from current class loader.
+ * Load Application KafkaStreamMessaging with SPI, by default from current class loader.
  */
 public class ApplicationProviderSPILoader extends ApplicationProviderLoader {
     private final String appProviderExtDir;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkConfig.java
deleted file mode 100644
index b3d984e..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/DefaultStreamSinkConfig.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-public class DefaultStreamSinkConfig implements StreamSinkConfig {
-    private final Class<?> streamPersistClass;
-    private static final String NONE_STORAGE_TYPE = "NONE";
-
-    public DefaultStreamSinkConfig(Class<?> streamPersistClass) {
-        this.streamPersistClass = streamPersistClass;
-    }
-
-    @Override
-    public String getType() {
-        return NONE_STORAGE_TYPE;
-    }
-
-    public Class<?> getSinkType() {
-        return streamPersistClass;
-    }
-
-    @Override
-    public Class<? extends StreamSinkConfig> getConfigType() {
-        return DefaultStreamSinkConfig.class;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/FlattenEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/FlattenEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/FlattenEventMapper.java
deleted file mode 100644
index 3649d72..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/FlattenEventMapper.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import backtype.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.List;
-
-public class FlattenEventMapper implements StreamEventMapper {
-    private final String streamId;
-    private static final String TIMESTAMP_FIELD = "timestamp";
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlattenEventMapper.class);
-
-    public FlattenEventMapper(String streamId) {
-        this.streamId = streamId;
-    }
-
-    @Override
-    public List<StreamEvent> map(Tuple tuple) throws Exception {
-        long timestamp;
-        if (tuple.getFields().contains(TIMESTAMP_FIELD)) {
-            try {
-                timestamp = tuple.getLongByField("timestamp");
-            } catch (Exception ex) {
-                // if timestamp is not null
-                LOGGER.error(ex.getMessage(), ex);
-                timestamp = 0;
-            }
-        } else {
-            timestamp = System.currentTimeMillis();
-        }
-        Object[] values = new Object[tuple.getFields().size()];
-        for (int i = 0; i < tuple.getFields().size(); i++) {
-            values[i] = tuple.getValue(i);
-        }
-        StreamEvent event = new StreamEvent();
-        event.setTimestamp(timestamp);
-        event.setStreamId(streamId);
-        event.setData(values);
-        return Collections.singletonList(event);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
deleted file mode 100644
index 12acb81..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamSink.class);
-    private String topicId;
-    private Producer producer;
-    private KafkaStreamSinkConfig config;
-
-    @Override
-    public void init(String streamId, KafkaStreamSinkConfig config) {
-        super.init(streamId, config);
-        this.topicId = config.getTopicId();
-        this.config = config;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        super.prepare(stormConf, context, collector);
-        Properties properties = new Properties();
-        properties.put("metadata.broker.list", config.getBrokerList());
-        properties.put("serializer.class", config.getSerializerClass());
-        properties.put("key.serializer.class", config.getKeySerializerClass());
-        // new added properties for async producer
-        properties.put("producer.type", config.getProducerType());
-        properties.put("batch.num.messages", config.getNumBatchMessages());
-        properties.put("request.required.acks", config.getRequestRequiredAcks());
-        properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs());
-        ProducerConfig producerConfig = new ProducerConfig(properties);
-        producer = new Producer(producerConfig);
-    }
-
-    @Override
-    protected void execute(Object key, Map event, OutputCollector collector) throws Exception {
-        try {
-            String output = new ObjectMapper().writeValueAsString(event);
-            // partition key may cause data skew
-            //producer.send(new KeyedMessage(this.topicId, key, output));
-            producer.send(new KeyedMessage(this.topicId, output));
-        } catch (Exception ex) {
-            LOG.error(ex.getMessage(), ex);
-            throw ex;
-        }
-    }
-
-    @Override
-    public void afterInstall() {
-        ensureTopicCreated();
-    }
-
-    private void ensureTopicCreated() {
-        LOG.info("TODO: ensure kafka topic {} created", this.topicId);
-    }
-
-    private void ensureTopicDeleted() {
-        LOG.info("TODO: ensure kafka topic {} deleted", this.topicId);
-    }
-
-    @Override
-    public void cleanup() {
-        if (this.producer != null) {
-            this.producer.close();
-        }
-    }
-
-    @Override
-    public void afterUninstall() {
-        ensureTopicDeleted();
-    }
-
-    public static class Provider implements StreamSinkProvider<KafkaStreamSink, KafkaStreamSinkConfig> {
-        private static final Logger LOG = LoggerFactory.getLogger(Provider.class);
-        private static final String DEAULT_SHARED_TOPIC_CONF_KEY = "dataSinkConfig.topic";
-
-        private String getStreamSpecificTopicConfigKey(String streamId) {
-            return String.format("dataSinkConfig.%s.topic",streamId);
-        }
-
-        @Override
-        public KafkaStreamSinkConfig getSinkConfig(String streamId, Config config) {
-            KafkaStreamSinkConfig desc = new KafkaStreamSinkConfig();
-            String streamSpecificTopicConfigKey = getStreamSpecificTopicConfigKey(streamId);
-            if (config.hasPath(streamSpecificTopicConfigKey)) {
-                desc.setTopicId(config.getString(streamSpecificTopicConfigKey));
-            } else if (config.hasPath(DEAULT_SHARED_TOPIC_CONF_KEY)) {
-                desc.setTopicId(config.getString(DEAULT_SHARED_TOPIC_CONF_KEY));
-                LOG.warn("Using default shared topic {}: {}", DEAULT_SHARED_TOPIC_CONF_KEY, desc.getTopicId());
-            } else {
-                LOG.error("Neither stream specific topic: {} nor default shared topic: {} found in config", streamSpecificTopicConfigKey, DEAULT_SHARED_TOPIC_CONF_KEY);
-                throw new IllegalArgumentException("Neither stream specific topic: "
-                    + streamSpecificTopicConfigKey + " nor default shared topic: " + DEAULT_SHARED_TOPIC_CONF_KEY + " found in config");
-            }
-            desc.setBrokerList(config.getString("dataSinkConfig.brokerList"));
-            desc.setSerializerClass(config.hasPath("dataSinkConfig.serializerClass")
-                ? config.getString("dataSinkConfig.serializerClass") : "kafka.serializer.StringEncoder");
-            desc.setKeySerializerClass(config.hasPath("dataSinkConfig.keySerializerClass")
-                ? config.getString("dataSinkConfig.keySerializerClass") : "kafka.serializer.StringEncoder");
-
-            // new added properties for async producer
-            desc.setNumBatchMessages(config.hasPath("dataSinkConfig.numBatchMessages")
-                ? config.getString("dataSinkConfig.numBatchMessages") : "1024");
-            desc.setProducerType(config.hasPath("dataSinkConfig.producerType")
-                ? config.getString("dataSinkConfig.producerType") : "async");
-            desc.setMaxQueueBufferMs(config.hasPath("dataSinkConfig.maxQueueBufferMs")
-                ? config.getString("dataSinkConfig.maxQueueBufferMs") : "3000");
-            desc.setRequestRequiredAcks(config.hasPath("dataSinkConfig.requestRequiredAcks")
-                ? config.getString("dataSinkConfig.requestRequiredAcks") : "1");
-            return desc;
-        }
-
-        @Override
-        public KafkaStreamSink getSink() {
-            return new KafkaStreamSink();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
deleted file mode 100644
index d5479df..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-public class KafkaStreamSinkConfig implements StreamSinkConfig {
-    private String topicId;
-    private String brokerList;
-    private String serializerClass;
-    private String keySerializerClass;
-    private String numBatchMessages;
-    private String maxQueueBufferMs;
-    private String producerType;
-    private String requestRequiredAcks;
-
-    public String getTopicId() {
-        return topicId;
-    }
-
-    public void setTopicId(String topicId) {
-        this.topicId = topicId;
-    }
-
-    public String getBrokerList() {
-        return brokerList;
-    }
-
-    public void setBrokerList(String brokerList) {
-        this.brokerList = brokerList;
-    }
-
-    public String getSerializerClass() {
-        return serializerClass;
-    }
-
-    public void setSerializerClass(String serializerClass) {
-        this.serializerClass = serializerClass;
-    }
-
-    public String getKeySerializerClass() {
-        return keySerializerClass;
-    }
-
-    public void setKeySerializerClass(String keySerializerClass) {
-        this.keySerializerClass = keySerializerClass;
-    }
-
-    public String getNumBatchMessages() {
-        return numBatchMessages;
-    }
-
-    public void setNumBatchMessages(String numBatchMessages) {
-        this.numBatchMessages = numBatchMessages;
-    }
-
-    public String getMaxQueueBufferMs() {
-        return maxQueueBufferMs;
-    }
-
-    public void setMaxQueueBufferMs(String maxQueueBufferMs) {
-        this.maxQueueBufferMs = maxQueueBufferMs;
-    }
-
-    public String getProducerType() {
-        return producerType;
-    }
-
-    public void setProducerType(String producerType) {
-        this.producerType = producerType;
-    }
-
-    public String getRequestRequiredAcks() {
-        return requestRequiredAcks;
-    }
-
-    public void setRequestRequiredAcks(String requestRequiredAcks) {
-        this.requestRequiredAcks = requestRequiredAcks;
-    }
-
-    @Override
-    public String getType() {
-        return "KAFKA";
-    }
-
-    @Override
-    public Class<?> getSinkType() {
-        return KafkaStreamSink.class;
-    }
-
-    @Override
-    public Class<? extends StreamSinkConfig> getConfigType() {
-        return KafkaStreamSinkConfig.class;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
deleted file mode 100644
index 3a02caf..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import backtype.storm.task.OutputCollector;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class LoggingStreamSink extends StormStreamSink<DefaultStreamSinkConfig> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class);
-
-    @Override
-    public void afterInstall() {
-        LOGGER.info("Executing afterInstall callback, do nothing");
-    }
-
-    @Override
-    public void afterUninstall() {
-        LOGGER.info("Executing afterUninstall callback, do nothing");
-    }
-
-    @Override
-    protected void execute(Object key, Map event, OutputCollector collector) throws Exception {
-        LOGGER.info("Receiving {}", event);
-    }
-
-    public static class Provider implements StreamSinkProvider<LoggingStreamSink, DefaultStreamSinkConfig> {
-        @Override
-        public DefaultStreamSinkConfig getSinkConfig(String streamId, Config config) {
-            return new DefaultStreamSinkConfig(LoggingStreamSink.class);
-        }
-
-        @Override
-        public LoggingStreamSink getSink() {
-            return new LoggingStreamSink();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
deleted file mode 100644
index ad40772..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.topology.base.BaseRichBolt;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseRichBolt implements StreamSink<K> {
-    private static final Logger LOG = LoggerFactory.getLogger(StormStreamSink.class);
-    private String streamId;
-    private OutputCollector collector;
-
-    @Override
-    public void init(String streamId, K config) {
-        this.streamId = streamId;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    /**
-     * Implicitly hides the Tuple protocol inside code as Tuple[Key,Map].
-     */
-    @Override
-    public void execute(Tuple input) {
-        try {
-            Map event = null;
-            Object key = input.getValue(0);
-            if (input.size() < 2) {
-                event = tupleAsMap(input);
-            } else {
-                Object value = input.getValue(1);
-                if (value != null) {
-                    if (value instanceof Map) {
-                        event = (Map) input.getValue(1);
-                    } else {
-                        event = tupleAsMap(input);
-                    }
-                }
-            }
-            execute(key, event, collector);
-            collector.ack(input);
-        } catch (Exception ex) {
-            LOG.error(ex.getMessage(), ex);
-            collector.reportError(ex);
-        }
-    }
-
-    protected abstract void execute(Object key, Map event, OutputCollector collector) throws Exception;
-
-    private Map tupleAsMap(Tuple tuple) {
-        Map values = new HashMap<>();
-        for (String field : tuple.getFields()) {
-            values.put(field, tuple.getValueByField(field));
-        }
-        return values;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamEventMapper.java
deleted file mode 100644
index 710e4d2..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamEventMapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import backtype.storm.tuple.Tuple;
-
-import java.io.Serializable;
-import java.util.List;
-
-@FunctionalInterface
-public interface StreamEventMapper extends Serializable {
-    /**
-     * Map from storm tuple to Stream Event.
-     *
-     * @param tuple
-     * @return
-     * @throws Exception
-     */
-    List<StreamEvent> map(Tuple tuple) throws Exception;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
deleted file mode 100644
index ee2790c..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.app.ApplicationLifecycle;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-public interface StreamSink<T extends StreamSinkConfig> extends ApplicationLifecycle {
-    void init(String streamId,T config);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
deleted file mode 100644
index 0425fa6..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-import com.typesafe.config.Config;
-
-import java.lang.reflect.ParameterizedType;
-
-public interface StreamSinkProvider<S extends StreamSink<D>, D extends StreamSinkConfig> {
-    D getSinkConfig(String streamId, Config config);
-
-    S getSink();
-
-    default S getSink(String streamId, Config config) {
-        S s = getSink();
-        s.init(streamId, getSinkConfig(streamId, config));
-        return s;
-    }
-
-    default Class<? extends S> getSinkType() {
-        return (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
-    }
-
-    default Class<? extends D> getSinkConfigType() {
-        return (Class<D>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
index 143e026..361b4c6 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
@@ -28,7 +28,7 @@ import java.lang.reflect.ParameterizedType;
 import java.util.Optional;
 
 /**
- * Application Service Provider Interface.
+ * Application Service KafkaStreamMessaging Interface.
  *
  * @param <T> Application Type.
  */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/StreamConvertHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/StreamConvertHelper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/StreamConvertHelper.java
new file mode 100644
index 0000000..c25a93b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/StreamConvertHelper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.utils;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.common.utils.Tuple2;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class StreamConvertHelper {
+    private static Map tupleToMap(Tuple tuple) {
+        Map values = new HashMap<>();
+        for (String field : tuple.getFields()) {
+            values.put(field, tuple.getValueByField(field));
+        }
+        return values;
+    }
+
+    public static Tuple2<Object,Map> tupleToEvent(Tuple input) {
+        Map event = null;
+        Object key = input.getValue(0);
+        if (input.size() < 2) {
+            event = StreamConvertHelper.tupleToMap(input);
+        } else {
+            Object value = input.getValue(1);
+            if (value != null) {
+                if (value instanceof Map) {
+                    event = (Map) input.getValue(1);
+                } else {
+                    event = StreamConvertHelper.tupleToMap(input);
+                }
+            }
+        }
+        return new Tuple2<>(key, event);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
index a47e30a..7977650 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
@@ -23,13 +23,10 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
-import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.typesafe.config.Config;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.common.module.ModuleRegistry;
 import org.apache.eagle.app.spi.AbstractApplicationProvider;
-import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
 import org.junit.Ignore;
 
 import java.util.Arrays;
@@ -61,7 +58,7 @@ public class TestStormApplication extends StormApplication{
 
         @Override
         public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+            outputFieldsDeclarer.declare(new Fields("metric","timestamp","fromStream","value"));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/StaticEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/StaticEnvironmentTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/StaticEnvironmentTest.java
index de64e6a..906bf66 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/StaticEnvironmentTest.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/StaticEnvironmentTest.java
@@ -34,6 +34,6 @@ public class StaticEnvironmentTest {
      */
     @Test
     public void testStreamSinkSupport(){
-        environment.streamSink();
+        environment.stream();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
index 5733531..eda9383 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
@@ -54,7 +54,7 @@ public class MockStormApplication extends StormApplication {
         }
         @Override
         public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+            outputFieldsDeclarer.declare(new Fields("metric","timestamp","fromStream","value"));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
new file mode 100644
index 0000000..b1a4193
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.stream;
+
+import org.apache.eagle.app.environment.builder.CEPFunction;
+import org.apache.eagle.app.environment.builder.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CEPFunctionTest {
+    @Test @Ignore("TODO: Not implement yet")
+    public void testSiddhiFunction() {
+        CEPFunction function = new CEPFunction(
+            "define stream inputStream (name string, value double);\n "
+                + "from inputStream#window.timeBatch( 1 min ) \n" +
+                "select name, avg(value) as avgValue\n" +
+                "group by name \n" +
+                "insert into outputStream ",
+            "inputStream","outputStream");
+        Collector collector = new Collector() {
+            @Override
+            public void collect(Object key, Map event) {
+
+            }
+        };
+        function.open(collector);
+        function.transform(new HashMap<String,Object>() {{
+            put("name","cpu.usage");
+            put("value", 0.98);
+        }});
+        function.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
index 4dbf0c4..377eed1 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
@@ -43,9 +43,8 @@
 		"store": "org.apache.eagle.metadata.service.memory.MemoryMetadataStore"
 	},
 	"application":{
-		"sink":{
-			"type": "org.apache.eagle.app.sink.KafkaStreamSink"
-			"boostrap.server":"localhost:9092"
+		"stream": {
+			"provider": "org.apache.eagle.app.messaging.KafkaStreamProvider"
 		}
 		"provider":{
 			"loader": "org.apache.eagle.app.service.impl.ApplicationProviderSPILoader"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/Tuple2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/Tuple2.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/Tuple2.java
new file mode 100644
index 0000000..ac3dc7c
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/Tuple2.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.utils;
+
+import java.io.Serializable;
+
+public class Tuple2<F1, F2> implements Serializable {
+    private final F1 f1;
+    private final F2 f2;
+
+    public Tuple2(F1 f1, F2 f2) {
+        this.f1 = f1;
+        this.f2 = f2;
+    }
+
+    public F2 f1() {
+        return f2;
+    }
+
+    public F1 f0() {
+        return f1;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("(%s,%s)", this.f1, this.f2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
index 47d16fd..a183351 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
@@ -19,10 +19,13 @@ package org.apache.eagle.metadata.model;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.metadata.utils.StreamIdConversions;
 
+import javax.xml.transform.stream.StreamSource;
+
 public class StreamDesc {
     private String streamId;
     private StreamDefinition schema;
-    private StreamSinkConfig sink;
+    private StreamSinkConfig sinkConfig;
+    private StreamSourceConfig sourceConfig;
 
     public String getStreamId() {
         return streamId;
@@ -40,11 +43,19 @@ public class StreamDesc {
         this.schema = streamSchema;
     }
 
-    public StreamSinkConfig getSink() {
-        return sink;
+    public StreamSinkConfig getSinkConfig() {
+        return sinkConfig;
+    }
+
+    public void setSinkConfig(StreamSinkConfig sinkDesc) {
+        this.sinkConfig = sinkDesc;
+    }
+
+    public StreamSourceConfig getSourceConfig() {
+        return sourceConfig;
     }
 
-    public void setSink(StreamSinkConfig sinkDesc) {
-        this.sink = sinkDesc;
+    public void setSourceConfig(StreamSourceConfig sourceConfig) {
+        this.sourceConfig = sourceConfig;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSourceConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSourceConfig.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSourceConfig.java
new file mode 100644
index 0000000..4034221
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSourceConfig.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metadata.model;
+
+import java.io.Serializable;
+
+public interface StreamSourceConfig extends Serializable {
+    String getType();
+
+    Class<?> getSourceType();
+
+    Class<? extends StreamSourceConfig> getConfigType();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-metadata/eagle-metadata-base/src/test/resources/application-test.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/test/resources/application-test.xml b/eagle-core/eagle-metadata/eagle-metadata-base/src/test/resources/application-test.xml
index 82e84ed..d408c5e 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/test/resources/application-test.xml
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/test/resources/application-test.xml
@@ -1,3 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
 <configuration>
     <property>
         <name>testkey1</name>