You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:14 UTC

[06/84] [partial] eagle git commit: Clean repo for eagle site

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java
deleted file mode 100644
index 51dad41..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.app.environment.builder;
-
-import com.google.common.base.Preconditions;
-import org.apache.eagle.app.utils.Clock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class CounterToRateFunction implements TransformFunction {
-    private static final Logger LOG = LoggerFactory.getLogger(CounterToRateFunction.class);
-    private final Map<String, CounterValue> cache;
-    private MetricDescriptor metricDescriptor;
-    private Collector collector;
-
-    public CounterToRateFunction(MetricDescriptor metricDescriptor, long heartbeat, TimeUnit unit, final Clock clock) {
-        final long heartbeatMillis = TimeUnit.MILLISECONDS.convert(heartbeat, unit);
-        this.cache = new LinkedHashMap<String, CounterValue>(16, 0.75f, true) {
-            protected boolean removeEldestEntry(Map.Entry<String, CounterValue> eldest) {
-                final long now = clock.now();
-                final long lastMod = eldest.getValue().getTimestamp();
-                final boolean expired = (now - lastMod) > heartbeatMillis;
-                if (expired) {
-                    LOG.debug("heartbeat interval exceeded, expiring {}", eldest.getKey());
-                }
-                return expired;
-            }
-        };
-        this.metricDescriptor = metricDescriptor;
-    }
-
-    @Override
-    public String getName() {
-        return "CounterToRate";
-    }
-
-    @Override
-    public void open(Collector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void transform(Map event) {
-        Metric metric = toMetric(event);
-        LOG.debug("received {} metrics", metric);
-        if (new DefaultCountMetricFilter().apply(metric.getMetricName())) {
-            final String metricName = metric.getMetricName();
-            final CounterValue prev = cache.get(metricName);
-            if (prev != null) {
-                final double rate = prev.computeRate(metric);
-                event.put(metricDescriptor.getValueField(), rate);
-                collector.collect(event.toString(), event);
-            } else {
-                CounterValue current = new CounterValue(metric);
-                cache.put(metricName, current);
-            }
-        } else {
-            collector.collect(event.toString(), event);
-        }
-
-    }
-
-    @Override
-    public void close() {
-        cache.clear();
-    }
-
-    private Metric toMetric(Map event) {
-
-        String metricName = "";
-        for (String dimensionField : metricDescriptor.getDimensionFields()) {
-            metricName += event.get(dimensionField) + "-";
-        }
-        metricName += metricDescriptor.getMetricNameSelector().getMetricName(event);
-
-        long timestamp = metricDescriptor.getTimestampSelector().getTimestamp(event);
-
-        return new Metric(metricName, timestamp, getCurrentValue(event));
-    }
-
-    private double getCurrentValue(Map event) {
-        double[] values;
-        if (event.containsKey(metricDescriptor.getValueField())) {
-            values = new double[] {(double) event.get(metricDescriptor.getValueField())};
-        } else {
-            LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(), event);
-            values = new double[] {0};
-        }
-        return values[0];
-    }
-
-    protected static class CounterValue {
-        private long timestamp;
-        private double value;
-
-        public CounterValue(long timestamp, double value) {
-            this.timestamp = timestamp;
-            this.value = value;
-        }
-
-        public CounterValue(Metric m) {
-            this(m.getTimestamp(), m.getNumberValue().doubleValue());
-        }
-
-        public long getTimestamp() {
-            return timestamp;
-        }
-
-        public double computeRate(Metric m) {
-            final long currentTimestamp = m.getTimestamp();
-            final double currentValue = m.getNumberValue().doubleValue();
-
-            final long durationMillis = currentTimestamp - timestamp;
-            final double delta = currentValue - value;
-
-            timestamp = currentTimestamp;
-            value = currentValue;
-
-            return computeRate(durationMillis, delta);
-        }
-
-        private double computeRate(long durationMillis, double delta) {
-            final double millisPerSecond = 1000.0;
-            final double duration = durationMillis / millisPerSecond;
-            return (duration <= 0.0 || delta <= 0.0) ? 0.0 : delta / duration;
-        }
-
-        @Override
-        public String toString() {
-            return "CounterValue{" + "timestamp=" + timestamp + ", value=" + value + '}';
-        }
-    }
-
-
-    protected final class Metric {
-        private final String metricName;
-        private final long timestamp;
-        private final Object value;
-
-        public Metric(String metricName, long timestamp, Object value) {
-            this.metricName = Preconditions.checkNotNull(metricName, "metricName");
-            this.timestamp = timestamp;
-            this.value = Preconditions.checkNotNull(value, "value");
-        }
-
-        public String getMetricName() {
-            return metricName;
-        }
-
-        public long getTimestamp() {
-            return timestamp;
-        }
-
-        public Object getValue() {
-            return value;
-        }
-
-        public Number getNumberValue() {
-            return (Number) value;
-        }
-
-        public boolean hasNumberValue() {
-            return (value instanceof Number);
-        }
-
-        public boolean isCounter() {
-            return metricName.endsWith("count");
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (obj == null || !(obj instanceof Metric)) {
-                return false;
-            }
-            Metric m = (Metric) obj;
-            return metricName.equals(m.getMetricName())
-                && timestamp == m.getTimestamp()
-                && value.equals(m.getValue());
-        }
-
-        @Override
-        public int hashCode() {
-            int result = metricName.hashCode();
-            result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
-            result = 31 * result + value.hashCode();
-            return result;
-        }
-
-        @Override
-        public String toString() {
-            return "Metric{metricName=" + metricName + ", timestamp=" + timestamp + ", value=" + value + '}';
-        }
-    }
-
-    private class DefaultCountMetricFilter implements CountMetricFilter {
-        @Override
-        public Boolean apply(String metricName) {
-            return metricName.endsWith("count");
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java
deleted file mode 100644
index 04c5bf9..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-import java.util.Map;
-
-public class MaxFunction extends AggregateFunction {
-    @Override
-    public String getName() {
-        return "MAX";
-    }
-
-    @Override
-    public void open(Collector collector) {
-        throw new IllegalStateException("TODO: Not implemented yet.");
-    }
-
-    @Override
-    public void transform(Map event) {
-        throw new IllegalStateException("TODO: Not implemented yet.");
-    }
-
-    @Override
-    public void close() {
-
-        throw new IllegalStateException("TODO: Not implemented yet.");
-    }
-
-    public static MaxFunction maxOf(String aggFieldName) {
-        MaxFunction function = new MaxFunction();
-        function.setAggFieldName(aggFieldName);
-        return function;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
deleted file mode 100644
index c33a92d..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
-
-public class MetricDescriptor implements Serializable {
-
-    /**
-     * Support simple and complex name format, by default using "metric" field.
-     */
-    private MetricNameSelector metricNameSelector = new FieldMetricNameSelector("metric");
-    private MetricGroupSelector metricGroupSelector = new FixedMetricGroupSelector(DEFAULT_METRIC_GROUP_NAME);
-    private SiteIdSelector siteIdSelector = new FieldSiteIdSelector("site");
-
-    private static final String DEFAULT_METRIC_GROUP_NAME = "Default";
-
-    public MetricNameSelector getMetricNameSelector() {
-        return metricNameSelector;
-    }
-
-    public void setMetricNameSelector(MetricNameSelector metricNameSelector) {
-        this.metricNameSelector = metricNameSelector;
-    }
-
-    public MetricGroupSelector getMetricGroupSelector() {
-        return metricGroupSelector;
-    }
-
-    public void setMetricGroupSelector(MetricGroupSelector metricGroupSelector) {
-        this.metricGroupSelector = metricGroupSelector;
-    }
-
-    /**
-     * Support event/system time, by default using system time.
-     */
-    private TimestampSelector timestampSelector = new SystemTimestampSelector();
-
-    /**
-     * Metric dimension field name.
-     */
-    private List<String> dimensionFields;
-
-    /**
-     * Metric granularity.
-     */
-    private int granularity = Calendar.MINUTE;
-
-    private String valueField = "value";
-    private String resourceField = "resource";
-
-    public String getResourceField() {
-        return resourceField;
-    }
-
-    public void setResourceField(String resourceField) {
-        this.resourceField = resourceField;
-    }
-
-    public String getValueField() {
-        return valueField;
-    }
-
-    public void setValueField(String valueField) {
-        this.valueField = valueField;
-    }
-
-    public List<String> getDimensionFields() {
-        return dimensionFields;
-    }
-
-    public void setDimensionFields(List<String> dimensionFields) {
-        this.dimensionFields = dimensionFields;
-    }
-
-    public TimestampSelector getTimestampSelector() {
-        return timestampSelector;
-    }
-
-    public void setTimestampSelector(TimestampSelector timestampSelector) {
-        this.timestampSelector = timestampSelector;
-    }
-
-    public int getGranularity() {
-        return granularity;
-    }
-
-    public void setGranularity(int granularity) {
-        this.granularity = granularity;
-    }
-
-    public SiteIdSelector getSiteIdSelector() {
-        return siteIdSelector;
-    }
-
-    public void setSiteIdSelector(SiteIdSelector siteIdSelector) {
-        this.siteIdSelector = siteIdSelector;
-    }
-
-
-    @FunctionalInterface
-    public interface MetricNameSelector extends Serializable {
-        String getMetricName(Map event);
-    }
-
-    @FunctionalInterface
-    public interface MetricGroupSelector extends Serializable {
-        String getMetricGroup(Map event);
-    }
-
-    public static class FixedMetricGroupSelector implements MetricGroupSelector {
-        private final String groupName;
-
-        private FixedMetricGroupSelector(String groupName) {
-            this.groupName = groupName;
-        }
-
-        @Override
-        public String getMetricGroup(Map event) {
-            return groupName;
-        }
-    }
-
-    @FunctionalInterface
-    public interface TimestampSelector extends Serializable {
-        Long getTimestamp(Map event);
-    }
-
-    @FunctionalInterface
-    public interface SiteIdSelector extends Serializable {
-        String getSiteId(Map event);
-    }
-
-    public class FixedSiteIdSelector implements SiteIdSelector {
-        private final String siteId;
-
-        private FixedSiteIdSelector(String siteId) {
-            this.siteId = siteId;
-        }
-
-        @Override
-        public String getSiteId(Map event) {
-            return this.siteId;
-        }
-    }
-
-    private class FieldSiteIdSelector implements SiteIdSelector {
-        private final String siteIdFieldName;
-
-        public FieldSiteIdSelector(String siteIdFieldName) {
-            this.siteIdFieldName = siteIdFieldName;
-        }
-
-        @Override
-        public String getSiteId(Map event) {
-            return (String) event.getOrDefault(this.siteIdFieldName, "UNKNOWN");
-        }
-    }
-
-    public MetricDescriptor namedBy(MetricNameSelector metricNameSelector) {
-        this.setMetricNameSelector(metricNameSelector);
-        return this;
-    }
-
-    public MetricDescriptor siteAs(SiteIdSelector siteIdSelector) {
-        this.setSiteIdSelector(siteIdSelector);
-        return this;
-    }
-
-    public MetricDescriptor siteAs(String siteId) {
-        this.setSiteIdSelector(new FixedSiteIdSelector(siteId));
-        return this;
-    }
-
-    public MetricDescriptor siteByField(String fieldName) {
-        this.setMetricNameSelector(new FieldMetricNameSelector(fieldName));
-        return this;
-    }
-
-    /**
-     * @see java.util.Calendar
-     */
-    public MetricDescriptor granularity(int granularity) {
-        this.setGranularity(granularity);
-        return this;
-    }
-
-    public MetricDescriptor namedByField(String nameField) {
-        this.setMetricNameSelector(new FieldMetricNameSelector(nameField));
-        return this;
-    }
-
-    public static MetricDescriptor metricGroupAs(String metricGroupName) {
-        return metricGroupAs(new FixedMetricGroupSelector(metricGroupName));
-    }
-
-    public static MetricDescriptor metricGroupAs(MetricGroupSelector groupSelector) {
-        MetricDescriptor metricDescriptor = new MetricDescriptor();
-        metricDescriptor.setMetricGroupSelector(groupSelector);
-        return metricDescriptor;
-    }
-
-    public static MetricDescriptor metricGroupByField(String fieldName, String defaultGroupName) {
-        MetricDescriptor metricDescriptor = new MetricDescriptor();
-        metricDescriptor.setMetricGroupSelector((MetricGroupSelector) event -> {
-            if (event.containsKey(fieldName)) {
-                return (String) event.get(fieldName);
-            } else {
-                return defaultGroupName;
-            }
-        });
-        return metricDescriptor;
-    }
-
-    public static MetricDescriptor metricGroupByField(String fieldName) {
-        return metricGroupByField(fieldName, DEFAULT_METRIC_GROUP_NAME);
-    }
-
-    public MetricDescriptor eventTimeByField(String timestampField) {
-        this.setTimestampSelector(new EventTimestampSelector(timestampField));
-        return this;
-    }
-
-    public MetricDescriptor dimensionFields(String... dimensionFields) {
-        this.setDimensionFields(Arrays.asList(dimensionFields));
-        return this;
-    }
-
-    public MetricDescriptor valueField(String valueField) {
-        this.setValueField(valueField);
-        return this;
-    }
-
-    public class EventTimestampSelector implements TimestampSelector {
-        private final String timestampField;
-
-        EventTimestampSelector(String timestampField) {
-            this.timestampField = timestampField;
-        }
-
-        @Override
-        public Long getTimestamp(Map event) {
-            if (event.containsKey(timestampField)) {
-                Object timestampValue = event.get(timestampField);
-                if (timestampValue instanceof Integer) {
-                    return Long.valueOf((Integer) timestampValue);
-                }
-                if (timestampValue instanceof String) {
-                    return Long.valueOf((String) timestampValue);
-                } else {
-                    return (Long) timestampValue;
-                }
-            } else {
-                throw new IllegalArgumentException("Timestamp field '" + timestampField + "' not exists");
-            }
-        }
-    }
-
-    public static class SystemTimestampSelector implements TimestampSelector {
-        @Override
-        public Long getTimestamp(Map event) {
-            return System.currentTimeMillis();
-        }
-    }
-
-    public static class FieldMetricNameSelector implements MetricNameSelector {
-        private final String fieldName;
-
-        FieldMetricNameSelector(String fieldName) {
-            this.fieldName = fieldName;
-        }
-
-        @Override
-        public String getMetricName(Map event) {
-            if (event.containsKey(fieldName)) {
-                return (String) event.get(fieldName);
-            } else {
-                throw new IllegalArgumentException("Metric name field '" + fieldName + "' not exists: " + event);
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java
deleted file mode 100644
index 9135cc8..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-import backtype.storm.task.OutputCollector;
-
-import java.util.Arrays;
-
-import java.util.Map;
-
-public class StormOutputCollector implements Collector {
-    private final OutputCollector delegate;
-
-    StormOutputCollector(OutputCollector delegate) {
-        this.delegate = delegate;
-    }
-
-    @Override
-    public void collect(Object key, Map event) {
-        delegate.emit(Arrays.asList(key, event));
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java
deleted file mode 100644
index 11974ff..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public interface TransformFunction extends Serializable {
-    String getName();
-
-    void open(Collector collector);
-
-    void transform(Map event);
-
-    void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java
deleted file mode 100644
index dbc7239..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.app.utils.StreamConvertHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class TransformFunctionBolt extends BaseRichBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(TransformFunctionBolt.class);
-    private final TransformFunction function;
-    private OutputCollector collector;
-
-    public TransformFunctionBolt(TransformFunction function) {
-        this.function = function;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.function.open(new StormOutputCollector(collector));
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        try {
-            this.function.transform(StreamConvertHelper.tupleToEvent(input).f1());
-            this.collector.ack(input);
-        } catch (Throwable throwable) {
-            LOG.error("Transform error: {}", input, throwable);
-            this.collector.reportError(throwable);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("f1","f2"));
-    }
-
-    @Override
-    public void cleanup() {
-        this.function.close();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java
deleted file mode 100644
index 4c576cf..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import org.apache.eagle.app.environment.AbstractEnvironment;
-import com.typesafe.config.Config;
-
-/**
- * Spark Execution Environment Context.
- */
-public class SparkEnvironment extends AbstractEnvironment {
-    public SparkEnvironment(Config config) {
-        super(config);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
deleted file mode 100644
index 1cad34f..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.environment.ExecutionRuntime;
-import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
-import com.typesafe.config.Config;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-
-public class SparkExecutionRuntime implements ExecutionRuntime<SparkEnvironment,Object> {
-    @Override
-    public void prepare(SparkEnvironment environment) {
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    @Override
-    public SparkEnvironment environment() {
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    @Override
-    public void start(Application executor, Config config) {
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    @Override
-    public void stop(Application executor, Config config) {
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    @Override
-    public ApplicationEntity.Status status(Application executor, Config config) {
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    public static class Provider implements ExecutionRuntimeProvider<SparkEnvironment,Object> {
-        @Override
-        public SparkExecutionRuntime get() {
-            return new SparkExecutionRuntime();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticApplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticApplicationExecutor.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticApplicationExecutor.java
deleted file mode 100644
index b5c7484..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticApplicationExecutor.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import org.apache.eagle.app.StaticApplication;
-
-/**
- * Web Application Container.
- */
-public class StaticApplicationExecutor {
-    private final StaticApplication webApplication;
-
-    public StaticApplicationExecutor(StaticApplication webApplication) {
-        this.webApplication = webApplication;
-    }
-
-    public StaticApplication getWebApplication() {
-        return webApplication;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticEnvironment.java
deleted file mode 100644
index 1c17b1f..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticEnvironment.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.app.environment.AbstractEnvironment;
-
-public class StaticEnvironment extends AbstractEnvironment {
-    public StaticEnvironment(Config config) {
-        super(config);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticExecutionRuntime.java
deleted file mode 100644
index b6e3fed..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticExecutionRuntime.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.app.environment.impl;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.environment.ExecutionRuntime;
-import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * StaticExecutionRuntime.
- */
-public class StaticExecutionRuntime implements ExecutionRuntime<StaticEnvironment,StaticApplicationExecutor> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(StaticExecutionRuntime.class);
-
-    private StaticEnvironment environment;
-
-    @Override
-    public void prepare(StaticEnvironment environment) {
-        this.environment = environment;
-    }
-
-    @Override
-    public StaticEnvironment environment() {
-        return this.environment;
-    }
-
-    @Override
-    public void start(Application<StaticEnvironment, StaticApplicationExecutor> executor, Config config) {
-        LOGGER.warn("Starting {}, do nothing",executor);
-    }
-
-    @Override
-    public void stop(Application<StaticEnvironment, StaticApplicationExecutor> executor, Config config) {
-        LOGGER.warn("Stopping {}, do nothing",executor);
-    }
-
-    @Override
-    public ApplicationEntity.Status status(Application<StaticEnvironment, StaticApplicationExecutor> executor, Config config) {
-        LOGGER.warn("Checking status {}, do nothing",executor);
-        return ApplicationEntity.Status.INITIALIZED;
-    }
-
-    public static class Provider implements ExecutionRuntimeProvider<StaticEnvironment,StaticApplicationExecutor> {
-        @Override
-        public ExecutionRuntime<StaticEnvironment, StaticApplicationExecutor> get() {
-            return new StaticExecutionRuntime();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
deleted file mode 100644
index 6827eef..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import org.apache.eagle.app.environment.AbstractEnvironment;
-import org.apache.eagle.app.environment.builder.ApplicationBuilder;
-import org.apache.eagle.app.environment.builder.MetricDescriptor;
-import org.apache.eagle.app.environment.builder.TransformFunction;
-import org.apache.eagle.app.environment.builder.TransformFunctionBolt;
-import org.apache.eagle.app.messaging.*;
-import com.typesafe.config.Config;
-
-/**
- * Storm Execution Environment Context.
- */
-public class StormEnvironment extends AbstractEnvironment {
-    public StormEnvironment(Config envConfig) {
-        super(envConfig);
-    }
-
-    // ----------------------------------
-    // Classic Storm Topology Builder API
-    // ----------------------------------
-    public StormStreamSink getStreamSink(String streamId, Config config) {
-        return ((StormStreamSink) stream().getSink(streamId,config));
-    }
-
-    public StormStreamSource getStreamSource(String streamId, Config config) {
-        return (StormStreamSource) stream().getSource(streamId,config);
-    }
-
-    public MetricStreamPersist getMetricPersist(MetricDescriptor metricDescriptor, Config config) {
-        return new MetricStreamPersist(metricDescriptor, config);
-    }
-
-    public EntityStreamPersist getEntityPersist(Config config) {
-        return new EntityStreamPersist(config);
-    }
-
-    public MetricSchemaGenerator getMetricSchemaGenerator(MetricDescriptor metricDescriptor, Config config) {
-        return new MetricSchemaGenerator(metricDescriptor, config);
-    }
-
-    public TransformFunctionBolt getTransformer(TransformFunction function) {
-        return new TransformFunctionBolt(function);
-    }
-
-    // ----------------------------------
-    // Fluent Storm App Builder API
-    // ----------------------------------
-
-    public ApplicationBuilder newApp(Config appConfig) {
-        return new ApplicationBuilder(appConfig, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
deleted file mode 100644
index 2b4180d..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.*;
-import backtype.storm.utils.NimbusClient;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.ConfigRenderOptions;
-import org.apache.eagle.alert.engine.runner.StormMetricTaggedConsumer;
-import org.apache.eagle.alert.metric.MetricConfigs;
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.environment.ExecutionRuntime;
-import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
-import org.apache.eagle.app.utils.DynamicJarPathFinder;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.thrift7.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Int;
-import storm.trident.spout.RichSpoutBatchExecutor;
-
-import java.util.List;
-import java.util.Objects;
-
-public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
-    private static final Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
-    private static LocalCluster _localCluster;
-
-    private StormEnvironment environment;
-    private KillOptions killOptions;
-
-    private static LocalCluster getLocalCluster() {
-        if (_localCluster == null) {
-            _localCluster = new LocalCluster();
-        }
-        return _localCluster;
-    }
-
-    public StormExecutionRuntime() {
-        this.killOptions = new KillOptions();
-        this.killOptions.set_wait_secs(0);
-    }
-
-    @Override
-    public void prepare(StormEnvironment environment) {
-        this.environment = environment;
-    }
-
-    @Override
-    public StormEnvironment environment() {
-        return this.environment;
-    }
-
-    public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";
-
-    private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
-    private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
-    private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
-    private static final String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
-
-    private static final String WORKERS = "workers";
-
-    private backtype.storm.Config getStormConfig(com.typesafe.config.Config config) {
-        backtype.storm.Config conf = new backtype.storm.Config();
-        conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
-        conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
-        conf.put(backtype.storm.Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32));
-        conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384));
-        conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
-        conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
-        String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
-        if (environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
-            nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
-            LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
-        } else {
-            LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
-        }
-        Integer nimbusThriftPort =  STORM_NIMBUS_THRIFT_DEFAULT;
-        if (environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
-            nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
-            LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
-        } else {
-            LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
-        }
-        conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost);
-        conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
-        conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "backtype.storm.security.auth.SimpleTransportPlugin");
-        if (config.hasPath(WORKERS)) {
-            conf.setNumWorkers(config.getInt(WORKERS));
-        }
-
-        if (config.hasPath(TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
-            conf.put(TOPOLOGY_MESSAGE_TIMEOUT_SECS, config.getInt(TOPOLOGY_MESSAGE_TIMEOUT_SECS));
-        }
-
-        if (config.hasPath(MetricConfigs.METRIC_SINK_CONF)) {
-            conf.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()), 1);
-        }
-        return conf;
-    }
-
-    @Override
-    public void start(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
-        String topologyName = config.getString("appId");
-        Preconditions.checkNotNull(topologyName,"[appId] is required by null for " + executor.getClass().getCanonicalName());
-        StormTopology topology = executor.execute(config, environment);
-        LOG.info("Starting {} ({}), mode: {}",topologyName, executor.getClass().getCanonicalName(), config.getString("mode"));
-        Config conf = getStormConfig(config);
-        if (ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))) {
-            String jarFile = config.hasPath("jarPath") ? config.getString("jarPath") : null;
-            if (jarFile == null) {
-                jarFile = DynamicJarPathFinder.findPath(executor.getClass());
-            }
-            synchronized (StormExecutionRuntime.class) {
-                System.setProperty("storm.jar", jarFile);
-                LOG.info("Submitting as cluster mode ...");
-                try {
-                    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
-                } catch (AlreadyAliveException | InvalidTopologyException e) {
-                    LOG.error(e.getMessage(), e);
-                    throw new RuntimeException(e.getMessage(),e);
-                } finally {
-                    System.clearProperty("storm.jar");
-                }
-            }
-        } else {
-            LOG.info("Submitting as local mode ...");
-            getLocalCluster().submitTopology(topologyName, conf, topology);
-            LOG.info("Submitted");
-        }
-        LOG.info("Started {} ({})",topologyName,executor.getClass().getCanonicalName());
-    }
-
-    @Override
-    public void stop(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
-        String appId = config.getString("appId");
-        LOG.info("Stopping topology {} ...", appId);
-        if (Objects.equals(config.getString("mode"), ApplicationEntity.Mode.CLUSTER.name())) {
-            Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig(config)).getClient();
-            try {
-                stormClient.killTopologyWithOpts(appId, this.killOptions);
-            } catch (NotAliveException | TException e) {
-                LOG.error("Failed to kill topology named {}, due to: {}",appId,e.getMessage(),e.getCause());
-                throw new RuntimeException(e.getMessage(),e);
-            }
-        } else {
-            getLocalCluster().killTopologyWithOpts(appId, this.killOptions);
-        }
-        LOG.info("Stopped topology {}", appId);
-    }
-
-    @Override
-    public ApplicationEntity.Status status(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
-        String appId = config.getString("appId");
-        LOG.info("Fetching {} status", appId);
-        List<TopologySummary> topologySummaries ;
-        ApplicationEntity.Status status = null;
-        try {
-            if (Objects.equals(config.getString("mode"), ApplicationEntity.Mode.CLUSTER.name())) {
-                Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig(config)).getClient();
-                topologySummaries = stormClient.getClusterInfo().get_topologies();
-            } else {
-                topologySummaries = getLocalCluster().getClusterInfo().get_topologies();
-            }
-
-            for (TopologySummary topologySummary : topologySummaries) {
-                if (topologySummary.get_name().equalsIgnoreCase(appId)) {
-                    if (topologySummary.get_status().equalsIgnoreCase("ACTIVE")) {
-                        status = ApplicationEntity.Status.RUNNING;
-                    } else if (topologySummary.get_status().equalsIgnoreCase("INACTIVE")) {
-                        status = ApplicationEntity.Status.STOPPED;
-                    } else if (topologySummary.get_status().equalsIgnoreCase("KILLED")) {
-                        status = ApplicationEntity.Status.STOPPING;
-                    } else {
-                        LOG.error("Unknown storm topology ({}) status: {}", topologySummary.get_status(),topologySummary.get_status());
-                        status = ApplicationEntity.Status.UNKNOWN;
-                    }
-                }
-            }
-            //If not exist, return removed
-            if (status == null) {
-                status = ApplicationEntity.Status.REMOVED;
-            }
-        } catch (TException e) {
-            LOG.error("Got error to fetch status of {}", appId, e);
-            status = ApplicationEntity.Status.UNKNOWN;
-        }
-        LOG.info("{} status is {}", appId, status);
-        return status;
-    }
-
-    public static class Provider implements ExecutionRuntimeProvider<StormEnvironment,StormTopology> {
-        @Override
-        public StormExecutionRuntime get() {
-            return new StormExecutionRuntime();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java
deleted file mode 100644
index 76d6e1b..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.utils.BufferFileInputStream;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-import backtype.storm.Config;
-
-/**
- * copy from storm StormSubmitter
- * just rewrite StormSubmitter that does not support summit other jars once submittedJar is set.
- * Our implementation will not add this restrict.
- * Use this class to submit topologies to run on the Storm cluster. You should run your program
- * with the "storm jar" command from the command-line, and then use this class to
- * submit your topologies.
- */
-
-public class StormSubmitter {
-    public static Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
-
-    private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;
-
-    private static Nimbus.Iface localNimbus = null;
-
-    public static void setLocalNimbus(Nimbus.Iface localNimbusHandler) {
-        StormSubmitter.localNimbus = localNimbusHandler;
-    }
-
-    /**
-     * Submits a topology to run on the cluster. A topology runs forever or until
-     * explicitly killed.
-     *
-     *
-     * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link Config}.
-     * @param topology the processing to execute.
-     * @throws AlreadyAliveException if a topology with this name is already running
-     * @throws InvalidTopologyException if an invalid topology was submitted
-     */
-    public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
-        submitTopology(name, stormConf, topology, null, null);
-    }
-
-    /**
-     * Submits a topology to run on the cluster. A topology runs forever or until
-     * explicitly killed.
-     *
-     * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link Config}.
-     * @param topology the processing to execute.
-     * @param opts to manipulate the starting of the topology.
-     * @throws AlreadyAliveException if a topology with this name is already running
-     * @throws InvalidTopologyException if an invalid topology was submitted
-     */
-    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts)
-        throws AlreadyAliveException, InvalidTopologyException {
-        submitTopology(name, stormConf, topology, opts, null);
-    }
-
-    /**
-     * Submits a topology to run on the cluster. A topology runs forever or until
-     * explicitly killed.
-     *
-     *
-     * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link Config}.
-     * @param topology the processing to execute.
-     * @param opts to manipulate the starting of the topology
-     * @param progressListener to track the progress of the jar upload process
-     * @throws AlreadyAliveException if a topology with this name is already running
-     * @throws InvalidTopologyException if an invalid topology was submitted
-     */
-    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
-                                      ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException {
-        if (!Utils.isValidConf(stormConf)) {
-            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
-        }
-        stormConf = new HashMap(stormConf);
-        stormConf.putAll(Utils.readCommandLineOpts());
-        Map conf = Utils.readStormConfig();
-        conf.putAll(stormConf);
-        try {
-            String serConf = JSONValue.toJSONString(stormConf);
-            if (localNimbus != null) {
-                LOG.info("Submitting topology " + name + " in local mode");
-                localNimbus.submitTopology(name, null, serConf, topology);
-            } else {
-                NimbusClient client = NimbusClient.getConfiguredClient(conf);
-                if (topologyNameExists(conf, name)) {
-                    throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
-                }
-                submitJar(conf, progressListener);
-                try {
-                    LOG.info("Submitting topology " +  name + " in distributed mode with conf " + serConf);
-                    if (opts != null) {
-                        client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
-                    } else {
-                        // this is for backwards compatibility
-                        client.getClient().submitTopology(name, submittedJar, serConf, topology);
-                    }
-                } catch (InvalidTopologyException e) {
-                    LOG.warn("Topology submission exception: " + e.get_msg());
-                    throw e;
-                } catch (AlreadyAliveException e) {
-                    LOG.warn("Topology already alive exception", e);
-                    throw e;
-                } finally {
-                    client.close();
-                }
-            }
-            LOG.info("Finished submitting topology: " +  name);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
-     * explicitly killed.
-     *
-     *
-     * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link Config}.
-     * @param topology the processing to execute.
-     * @throws AlreadyAliveException if a topology with this name is already running
-     * @throws InvalidTopologyException if an invalid topology was submitted
-     */
-
-    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
-        submitTopologyWithProgressBar(name, stormConf, topology, null);
-    }
-
-    /**
-     * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
-     * explicitly killed.
-     *
-     *
-     * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link Config}.
-     * @param topology the processing to execute.
-     * @param opts to manipulate the starting of the topology
-     * @throws AlreadyAliveException if a topology with this name is already running
-     * @throws InvalidTopologyException if an invalid topology was submitted
-     */
-
-    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
-        // show a progress bar so we know we're not stuck (especially on slow connections)
-        submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
-            @Override
-            public void onStart(String srcFile, String targetFile, long totalBytes) {
-                System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
-            }
-
-            @Override
-            public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) {
-                int length = 50;
-                int p = (int)((length * bytesUploaded) / totalBytes);
-                String progress = StringUtils.repeat("=", p);
-                String todo = StringUtils.repeat(" ", length - p);
-
-                System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
-            }
-
-            @Override
-            public void onCompleted(String srcFile, String targetFile, long totalBytes) {
-                System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
-            }
-        });
-    }
-
-    private static boolean topologyNameExists(Map conf, String name) {
-        NimbusClient client = NimbusClient.getConfiguredClient(conf);
-        try {
-            ClusterSummary summary = client.getClient().getClusterInfo();
-            for (TopologySummary s : summary.get_topologies()) {
-                if (s.get_name().equals(name)) {
-                    return true;
-                }
-            }
-            return false;
-
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        } finally {
-            client.close();
-        }
-    }
-
-    private static String submittedJar = null;
-
-    private static void submitJar(Map conf, ProgressListener listener) {
-        LOG.info("before uploaded, submittedJar = {}", submittedJar);
-        String localJar = System.getProperty("storm.jar");
-        submittedJar = submitJar(conf, localJar, listener);
-        LOG.info("after uploaded, submittedJar = {}", submittedJar);
-    }
-
-    /**
-     * Submit jar file
-     * @param conf the topology-specific configuration. See {@link Config}.
-     * @param localJar file path of the jar file to submit
-     * @return the remote location of the submitted jar
-     */
-    public static String submitJar(Map conf, String localJar) {
-        return submitJar(conf, localJar, null);
-    }
-
-    /**
-     * Submit jar file
-     * @param conf the topology-specific configuration. See {@link Config}.
-     * @param localJar file path of the jar file to submit
-     * @param listener progress listener to track the jar file upload
-     * @return the remote location of the submitted jar
-     */
-    public static String submitJar(Map conf, String localJar, ProgressListener listener) {
-        if (localJar == null) {
-            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
-        }
-
-        NimbusClient client = NimbusClient.getConfiguredClient(conf);
-        try {
-            String uploadLocation = client.getClient().beginFileUpload();
-            LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
-            BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);
-
-            long totalSize = new File(localJar).length();
-            if (listener != null) {
-                listener.onStart(localJar, uploadLocation, totalSize);
-            }
-
-            long bytesUploaded = 0;
-            while (true) {
-                byte[] toSubmit = is.read();
-                bytesUploaded += toSubmit.length;
-                if (listener != null) {
-                    listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
-                }
-
-                if (toSubmit.length == 0) {
-                    break;
-                }
-                client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
-            }
-            client.getClient().finishFileUpload(uploadLocation);
-
-            if (listener != null) {
-                listener.onCompleted(localJar, uploadLocation, totalSize);
-            }
-
-            LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
-            return uploadLocation;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        } finally {
-            client.close();
-        }
-    }
-
-    /**
-     * Interface use to track progress of file upload.
-     */
-    public interface ProgressListener {
-        /**
-         * called before file is uploaded.
-         * @param srcFile - jar file to be uploaded
-         * @param targetFile - destination file
-         * @param totalBytes - total number of bytes of the file
-         */
-        public void onStart(String srcFile, String targetFile, long totalBytes);
-
-        /**
-         * called whenever a chunk of bytes is uploaded.
-         * @param srcFile - jar file to be uploaded
-         * @param targetFile - destination file
-         * @param bytesUploaded - number of bytes transferred so far
-         * @param totalBytes - total number of bytes of the file
-         */
-        public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes);
-
-        /**
-         * called when the file is uploaded.
-         * @param srcFile - jar file to be uploaded
-         * @param targetFile - destination file
-         * @param totalBytes - total number of bytes of the file
-         */
-        public void onCompleted(String srcFile, String targetFile, long totalBytes);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java
deleted file mode 100644
index d1cecc9..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-public class DefaultStreamSinkConfig implements StreamSinkConfig {
-    private final Class<?> streamPersistClass;
-    private static final String NONE_STORAGE_TYPE = "NONE";
-
-    public DefaultStreamSinkConfig(Class<?> streamPersistClass) {
-        this.streamPersistClass = streamPersistClass;
-    }
-
-    @Override
-    public String getType() {
-        return NONE_STORAGE_TYPE;
-    }
-
-    public Class<?> getSinkType() {
-        return streamPersistClass;
-    }
-
-    @Override
-    public Class<? extends StreamSinkConfig> getConfigType() {
-        return DefaultStreamSinkConfig.class;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java
deleted file mode 100644
index e216dc6..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.app.messaging;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-public class EntityStreamPersist extends BaseRichBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(EntityStreamPersist.class);
-
-    private final Config config;
-    private IEagleServiceClient client;
-    private OutputCollector collector;
-    private int batchSize;
-    private List<TaggedLogAPIEntity> entityBucket = new CopyOnWriteArrayList<>();
-
-    public EntityStreamPersist(Config config) {
-        this.config = config;
-        this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.client = new EagleServiceClientImpl(config);
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        List<? extends TaggedLogAPIEntity> entities = (List<? extends TaggedLogAPIEntity>) input.getValue(0);
-        entityBucket.addAll(entities);
-
-        if (entityBucket.size() < batchSize) {
-            return;
-        }
-
-        try {
-            GenericServiceAPIResponseEntity response = client.create(entityBucket);
-            if (response.isSuccess()) {
-                LOG.info("persist {} entities with starttime={}", entityBucket.size(), entityBucket.get(0).getTimestamp());
-                collector.ack(input);
-            } else {
-                LOG.error("Service side error: {}", response.getException());
-                collector.reportError(new IllegalStateException(response.getException()));
-            }
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-            collector.fail(input);
-        }
-        entityBucket.clear();
-    }
-
-    @Override
-    public void cleanup() {
-        try {
-            this.client.getJerseyClient().destroy();
-            this.client.close();
-        } catch (IOException e) {
-            LOG.error("Close client error: {}", e.getMessage(), e);
-        } finally {
-            super.cleanup();
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java
deleted file mode 100644
index c8fe1b5..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import backtype.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.List;
-
-public class FlattenEventMapper implements StreamEventMapper {
-    private final String streamId;
-    private static final String TIMESTAMP_FIELD = "timestamp";
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlattenEventMapper.class);
-
-    public FlattenEventMapper(String streamId) {
-        this.streamId = streamId;
-    }
-
-    @Override
-    public List<StreamEvent> map(Tuple tuple) throws Exception {
-        long timestamp;
-        if (tuple.getFields().contains(TIMESTAMP_FIELD)) {
-            try {
-                timestamp = tuple.getLongByField("timestamp");
-            } catch (Exception ex) {
-                // if timestamp is not null
-                LOGGER.error(ex.getMessage(), ex);
-                timestamp = 0;
-            }
-        } else {
-            timestamp = System.currentTimeMillis();
-        }
-        Object[] values = new Object[tuple.getFields().size()];
-        for (int i = 0; i < tuple.getFields().size(); i++) {
-            values[i] = tuple.getValue(i);
-        }
-        StreamEvent event = new StreamEvent();
-        event.setTimestamp(timestamp);
-        event.setStreamId(streamId);
-        event.setData(values);
-        return Collections.singletonList(event);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java
deleted file mode 100644
index 987ed0b..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * General Json Schema.
- * Different from org.apache.eagle.alert.engine.scheme.JsonScheme which is just to multi-topic cases.
- *
- * @see org.apache.eagle.alert.engine.scheme.JsonScheme
- */
-public class JsonSchema implements Scheme {
-    private static final long serialVersionUID = -8352896475656975577L;
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(JsonSchema.class);
-    private static final ObjectMapper mapper = new ObjectMapper();
-
-    @Override
-    public Fields getOutputFields() {
-        return new Fields("f1","f2");
-    }
-
-    @Override
-    @SuppressWarnings("rawtypes")
-    public List<Object> deserialize(byte[] ser) {
-        try {
-            if (ser != null) {
-                Map map = mapper.readValue(ser, Map.class);
-                return Arrays.asList(map.hashCode(), map);
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Content is null, ignore");
-                }
-            }
-        } catch (IOException e) {
-            try {
-                LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e);
-            } catch (Exception ex) {
-                LOG.error(ex.getMessage(), ex);
-            }
-        }
-        return null;
-    }
-}
\ No newline at end of file