You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:14 UTC
[06/84] [partial] eagle git commit: Clean repo for eagle site
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java
deleted file mode 100644
index 51dad41..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.app.environment.builder;
-
-import com.google.common.base.Preconditions;
-import org.apache.eagle.app.utils.Clock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class CounterToRateFunction implements TransformFunction {
- private static final Logger LOG = LoggerFactory.getLogger(CounterToRateFunction.class);
- private final Map<String, CounterValue> cache;
- private MetricDescriptor metricDescriptor;
- private Collector collector;
-
- public CounterToRateFunction(MetricDescriptor metricDescriptor, long heartbeat, TimeUnit unit, final Clock clock) {
- final long heartbeatMillis = TimeUnit.MILLISECONDS.convert(heartbeat, unit);
- this.cache = new LinkedHashMap<String, CounterValue>(16, 0.75f, true) {
- protected boolean removeEldestEntry(Map.Entry<String, CounterValue> eldest) {
- final long now = clock.now();
- final long lastMod = eldest.getValue().getTimestamp();
- final boolean expired = (now - lastMod) > heartbeatMillis;
- if (expired) {
- LOG.debug("heartbeat interval exceeded, expiring {}", eldest.getKey());
- }
- return expired;
- }
- };
- this.metricDescriptor = metricDescriptor;
- }
-
- @Override
- public String getName() {
- return "CounterToRate";
- }
-
- @Override
- public void open(Collector collector) {
- this.collector = collector;
- }
-
- @Override
- public void transform(Map event) {
- Metric metric = toMetric(event);
- LOG.debug("received {} metrics", metric);
- if (new DefaultCountMetricFilter().apply(metric.getMetricName())) {
- final String metricName = metric.getMetricName();
- final CounterValue prev = cache.get(metricName);
- if (prev != null) {
- final double rate = prev.computeRate(metric);
- event.put(metricDescriptor.getValueField(), rate);
- collector.collect(event.toString(), event);
- } else {
- CounterValue current = new CounterValue(metric);
- cache.put(metricName, current);
- }
- } else {
- collector.collect(event.toString(), event);
- }
-
- }
-
- @Override
- public void close() {
- cache.clear();
- }
-
- private Metric toMetric(Map event) {
-
- String metricName = "";
- for (String dimensionField : metricDescriptor.getDimensionFields()) {
- metricName += event.get(dimensionField) + "-";
- }
- metricName += metricDescriptor.getMetricNameSelector().getMetricName(event);
-
- long timestamp = metricDescriptor.getTimestampSelector().getTimestamp(event);
-
- return new Metric(metricName, timestamp, getCurrentValue(event));
- }
-
- private double getCurrentValue(Map event) {
- double[] values;
- if (event.containsKey(metricDescriptor.getValueField())) {
- values = new double[] {(double) event.get(metricDescriptor.getValueField())};
- } else {
- LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(), event);
- values = new double[] {0};
- }
- return values[0];
- }
-
- protected static class CounterValue {
- private long timestamp;
- private double value;
-
- public CounterValue(long timestamp, double value) {
- this.timestamp = timestamp;
- this.value = value;
- }
-
- public CounterValue(Metric m) {
- this(m.getTimestamp(), m.getNumberValue().doubleValue());
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public double computeRate(Metric m) {
- final long currentTimestamp = m.getTimestamp();
- final double currentValue = m.getNumberValue().doubleValue();
-
- final long durationMillis = currentTimestamp - timestamp;
- final double delta = currentValue - value;
-
- timestamp = currentTimestamp;
- value = currentValue;
-
- return computeRate(durationMillis, delta);
- }
-
- private double computeRate(long durationMillis, double delta) {
- final double millisPerSecond = 1000.0;
- final double duration = durationMillis / millisPerSecond;
- return (duration <= 0.0 || delta <= 0.0) ? 0.0 : delta / duration;
- }
-
- @Override
- public String toString() {
- return "CounterValue{" + "timestamp=" + timestamp + ", value=" + value + '}';
- }
- }
-
-
- protected final class Metric {
- private final String metricName;
- private final long timestamp;
- private final Object value;
-
- public Metric(String metricName, long timestamp, Object value) {
- this.metricName = Preconditions.checkNotNull(metricName, "metricName");
- this.timestamp = timestamp;
- this.value = Preconditions.checkNotNull(value, "value");
- }
-
- public String getMetricName() {
- return metricName;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public Object getValue() {
- return value;
- }
-
- public Number getNumberValue() {
- return (Number) value;
- }
-
- public boolean hasNumberValue() {
- return (value instanceof Number);
- }
-
- public boolean isCounter() {
- return metricName.endsWith("count");
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null || !(obj instanceof Metric)) {
- return false;
- }
- Metric m = (Metric) obj;
- return metricName.equals(m.getMetricName())
- && timestamp == m.getTimestamp()
- && value.equals(m.getValue());
- }
-
- @Override
- public int hashCode() {
- int result = metricName.hashCode();
- result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
- result = 31 * result + value.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "Metric{metricName=" + metricName + ", timestamp=" + timestamp + ", value=" + value + '}';
- }
- }
-
- private class DefaultCountMetricFilter implements CountMetricFilter {
- @Override
- public Boolean apply(String metricName) {
- return metricName.endsWith("count");
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java
deleted file mode 100644
index 04c5bf9..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MaxFunction.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-import java.util.Map;
-
-public class MaxFunction extends AggregateFunction {
- @Override
- public String getName() {
- return "MAX";
- }
-
- @Override
- public void open(Collector collector) {
- throw new IllegalStateException("TODO: Not implemented yet.");
- }
-
- @Override
- public void transform(Map event) {
- throw new IllegalStateException("TODO: Not implemented yet.");
- }
-
- @Override
- public void close() {
-
- throw new IllegalStateException("TODO: Not implemented yet.");
- }
-
- public static MaxFunction maxOf(String aggFieldName) {
- MaxFunction function = new MaxFunction();
- function.setAggFieldName(aggFieldName);
- return function;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
deleted file mode 100644
index c33a92d..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
-
-public class MetricDescriptor implements Serializable {
-
- /**
- * Support simple and complex name format, by default using "metric" field.
- */
- private MetricNameSelector metricNameSelector = new FieldMetricNameSelector("metric");
- private MetricGroupSelector metricGroupSelector = new FixedMetricGroupSelector(DEFAULT_METRIC_GROUP_NAME);
- private SiteIdSelector siteIdSelector = new FieldSiteIdSelector("site");
-
- private static final String DEFAULT_METRIC_GROUP_NAME = "Default";
-
- public MetricNameSelector getMetricNameSelector() {
- return metricNameSelector;
- }
-
- public void setMetricNameSelector(MetricNameSelector metricNameSelector) {
- this.metricNameSelector = metricNameSelector;
- }
-
- public MetricGroupSelector getMetricGroupSelector() {
- return metricGroupSelector;
- }
-
- public void setMetricGroupSelector(MetricGroupSelector metricGroupSelector) {
- this.metricGroupSelector = metricGroupSelector;
- }
-
- /**
- * Support event/system time, by default using system time.
- */
- private TimestampSelector timestampSelector = new SystemTimestampSelector();
-
- /**
- * Metric dimension field name.
- */
- private List<String> dimensionFields;
-
- /**
- * Metric granularity.
- */
- private int granularity = Calendar.MINUTE;
-
- private String valueField = "value";
- private String resourceField = "resource";
-
- public String getResourceField() {
- return resourceField;
- }
-
- public void setResourceField(String resourceField) {
- this.resourceField = resourceField;
- }
-
- public String getValueField() {
- return valueField;
- }
-
- public void setValueField(String valueField) {
- this.valueField = valueField;
- }
-
- public List<String> getDimensionFields() {
- return dimensionFields;
- }
-
- public void setDimensionFields(List<String> dimensionFields) {
- this.dimensionFields = dimensionFields;
- }
-
- public TimestampSelector getTimestampSelector() {
- return timestampSelector;
- }
-
- public void setTimestampSelector(TimestampSelector timestampSelector) {
- this.timestampSelector = timestampSelector;
- }
-
- public int getGranularity() {
- return granularity;
- }
-
- public void setGranularity(int granularity) {
- this.granularity = granularity;
- }
-
- public SiteIdSelector getSiteIdSelector() {
- return siteIdSelector;
- }
-
- public void setSiteIdSelector(SiteIdSelector siteIdSelector) {
- this.siteIdSelector = siteIdSelector;
- }
-
-
- @FunctionalInterface
- public interface MetricNameSelector extends Serializable {
- String getMetricName(Map event);
- }
-
- @FunctionalInterface
- public interface MetricGroupSelector extends Serializable {
- String getMetricGroup(Map event);
- }
-
- public static class FixedMetricGroupSelector implements MetricGroupSelector {
- private final String groupName;
-
- private FixedMetricGroupSelector(String groupName) {
- this.groupName = groupName;
- }
-
- @Override
- public String getMetricGroup(Map event) {
- return groupName;
- }
- }
-
- @FunctionalInterface
- public interface TimestampSelector extends Serializable {
- Long getTimestamp(Map event);
- }
-
- @FunctionalInterface
- public interface SiteIdSelector extends Serializable {
- String getSiteId(Map event);
- }
-
- public class FixedSiteIdSelector implements SiteIdSelector {
- private final String siteId;
-
- private FixedSiteIdSelector(String siteId) {
- this.siteId = siteId;
- }
-
- @Override
- public String getSiteId(Map event) {
- return this.siteId;
- }
- }
-
- private class FieldSiteIdSelector implements SiteIdSelector {
- private final String siteIdFieldName;
-
- public FieldSiteIdSelector(String siteIdFieldName) {
- this.siteIdFieldName = siteIdFieldName;
- }
-
- @Override
- public String getSiteId(Map event) {
- return (String) event.getOrDefault(this.siteIdFieldName, "UNKNOWN");
- }
- }
-
- public MetricDescriptor namedBy(MetricNameSelector metricNameSelector) {
- this.setMetricNameSelector(metricNameSelector);
- return this;
- }
-
- public MetricDescriptor siteAs(SiteIdSelector siteIdSelector) {
- this.setSiteIdSelector(siteIdSelector);
- return this;
- }
-
- public MetricDescriptor siteAs(String siteId) {
- this.setSiteIdSelector(new FixedSiteIdSelector(siteId));
- return this;
- }
-
- public MetricDescriptor siteByField(String fieldName) {
- this.setMetricNameSelector(new FieldMetricNameSelector(fieldName));
- return this;
- }
-
- /**
- * @see java.util.Calendar
- */
- public MetricDescriptor granularity(int granularity) {
- this.setGranularity(granularity);
- return this;
- }
-
- public MetricDescriptor namedByField(String nameField) {
- this.setMetricNameSelector(new FieldMetricNameSelector(nameField));
- return this;
- }
-
- public static MetricDescriptor metricGroupAs(String metricGroupName) {
- return metricGroupAs(new FixedMetricGroupSelector(metricGroupName));
- }
-
- public static MetricDescriptor metricGroupAs(MetricGroupSelector groupSelector) {
- MetricDescriptor metricDescriptor = new MetricDescriptor();
- metricDescriptor.setMetricGroupSelector(groupSelector);
- return metricDescriptor;
- }
-
- public static MetricDescriptor metricGroupByField(String fieldName, String defaultGroupName) {
- MetricDescriptor metricDescriptor = new MetricDescriptor();
- metricDescriptor.setMetricGroupSelector((MetricGroupSelector) event -> {
- if (event.containsKey(fieldName)) {
- return (String) event.get(fieldName);
- } else {
- return defaultGroupName;
- }
- });
- return metricDescriptor;
- }
-
- public static MetricDescriptor metricGroupByField(String fieldName) {
- return metricGroupByField(fieldName, DEFAULT_METRIC_GROUP_NAME);
- }
-
- public MetricDescriptor eventTimeByField(String timestampField) {
- this.setTimestampSelector(new EventTimestampSelector(timestampField));
- return this;
- }
-
- public MetricDescriptor dimensionFields(String... dimensionFields) {
- this.setDimensionFields(Arrays.asList(dimensionFields));
- return this;
- }
-
- public MetricDescriptor valueField(String valueField) {
- this.setValueField(valueField);
- return this;
- }
-
- public class EventTimestampSelector implements TimestampSelector {
- private final String timestampField;
-
- EventTimestampSelector(String timestampField) {
- this.timestampField = timestampField;
- }
-
- @Override
- public Long getTimestamp(Map event) {
- if (event.containsKey(timestampField)) {
- Object timestampValue = event.get(timestampField);
- if (timestampValue instanceof Integer) {
- return Long.valueOf((Integer) timestampValue);
- }
- if (timestampValue instanceof String) {
- return Long.valueOf((String) timestampValue);
- } else {
- return (Long) timestampValue;
- }
- } else {
- throw new IllegalArgumentException("Timestamp field '" + timestampField + "' not exists");
- }
- }
- }
-
- public static class SystemTimestampSelector implements TimestampSelector {
- @Override
- public Long getTimestamp(Map event) {
- return System.currentTimeMillis();
- }
- }
-
- public static class FieldMetricNameSelector implements MetricNameSelector {
- private final String fieldName;
-
- FieldMetricNameSelector(String fieldName) {
- this.fieldName = fieldName;
- }
-
- @Override
- public String getMetricName(Map event) {
- if (event.containsKey(fieldName)) {
- return (String) event.get(fieldName);
- } else {
- throw new IllegalArgumentException("Metric name field '" + fieldName + "' not exists: " + event);
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java
deleted file mode 100644
index 9135cc8..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/StormOutputCollector.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-import backtype.storm.task.OutputCollector;
-
-import java.util.Arrays;
-
-import java.util.Map;
-
-public class StormOutputCollector implements Collector {
- private final OutputCollector delegate;
-
- StormOutputCollector(OutputCollector delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public void collect(Object key, Map event) {
- delegate.emit(Arrays.asList(key, event));
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java
deleted file mode 100644
index 11974ff..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public interface TransformFunction extends Serializable {
- String getName();
-
- void open(Collector collector);
-
- void transform(Map event);
-
- void close();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java
deleted file mode 100644
index dbc7239..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/TransformFunctionBolt.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.app.utils.StreamConvertHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class TransformFunctionBolt extends BaseRichBolt {
- private static final Logger LOG = LoggerFactory.getLogger(TransformFunctionBolt.class);
- private final TransformFunction function;
- private OutputCollector collector;
-
- public TransformFunctionBolt(TransformFunction function) {
- this.function = function;
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.function.open(new StormOutputCollector(collector));
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- try {
- this.function.transform(StreamConvertHelper.tupleToEvent(input).f1());
- this.collector.ack(input);
- } catch (Throwable throwable) {
- LOG.error("Transform error: {}", input, throwable);
- this.collector.reportError(throwable);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("f1","f2"));
- }
-
- @Override
- public void cleanup() {
- this.function.close();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java
deleted file mode 100644
index 4c576cf..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkEnvironment.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import org.apache.eagle.app.environment.AbstractEnvironment;
-import com.typesafe.config.Config;
-
-/**
- * Spark Execution Environment Context.
- */
-public class SparkEnvironment extends AbstractEnvironment {
- public SparkEnvironment(Config config) {
- super(config);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
deleted file mode 100644
index 1cad34f..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.environment.ExecutionRuntime;
-import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
-import com.typesafe.config.Config;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-
-public class SparkExecutionRuntime implements ExecutionRuntime<SparkEnvironment,Object> {
- @Override
- public void prepare(SparkEnvironment environment) {
- throw new RuntimeException("Not implemented yet");
- }
-
- @Override
- public SparkEnvironment environment() {
- throw new RuntimeException("Not implemented yet");
- }
-
- @Override
- public void start(Application executor, Config config) {
- throw new RuntimeException("Not implemented yet");
- }
-
- @Override
- public void stop(Application executor, Config config) {
- throw new RuntimeException("Not implemented yet");
- }
-
- @Override
- public ApplicationEntity.Status status(Application executor, Config config) {
- throw new RuntimeException("Not implemented yet");
- }
-
- public static class Provider implements ExecutionRuntimeProvider<SparkEnvironment,Object> {
- @Override
- public SparkExecutionRuntime get() {
- return new SparkExecutionRuntime();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticApplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticApplicationExecutor.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticApplicationExecutor.java
deleted file mode 100644
index b5c7484..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticApplicationExecutor.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import org.apache.eagle.app.StaticApplication;
-
-/**
- * Web Application Container.
- */
-public class StaticApplicationExecutor {
- private final StaticApplication webApplication;
-
- public StaticApplicationExecutor(StaticApplication webApplication) {
- this.webApplication = webApplication;
- }
-
- public StaticApplication getWebApplication() {
- return webApplication;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticEnvironment.java
deleted file mode 100644
index 1c17b1f..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticEnvironment.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.app.environment.AbstractEnvironment;
-
-public class StaticEnvironment extends AbstractEnvironment {
- public StaticEnvironment(Config config) {
- super(config);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticExecutionRuntime.java
deleted file mode 100644
index b6e3fed..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StaticExecutionRuntime.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.app.environment.impl;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.environment.ExecutionRuntime;
-import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * StaticExecutionRuntime.
- */
-public class StaticExecutionRuntime implements ExecutionRuntime<StaticEnvironment,StaticApplicationExecutor> {
- private static final Logger LOGGER = LoggerFactory.getLogger(StaticExecutionRuntime.class);
-
- private StaticEnvironment environment;
-
- @Override
- public void prepare(StaticEnvironment environment) {
- this.environment = environment;
- }
-
- @Override
- public StaticEnvironment environment() {
- return this.environment;
- }
-
- @Override
- public void start(Application<StaticEnvironment, StaticApplicationExecutor> executor, Config config) {
- LOGGER.warn("Starting {}, do nothing",executor);
- }
-
- @Override
- public void stop(Application<StaticEnvironment, StaticApplicationExecutor> executor, Config config) {
- LOGGER.warn("Stopping {}, do nothing",executor);
- }
-
- @Override
- public ApplicationEntity.Status status(Application<StaticEnvironment, StaticApplicationExecutor> executor, Config config) {
- LOGGER.warn("Checking status {}, do nothing",executor);
- return ApplicationEntity.Status.INITIALIZED;
- }
-
- public static class Provider implements ExecutionRuntimeProvider<StaticEnvironment,StaticApplicationExecutor> {
- @Override
- public ExecutionRuntime<StaticEnvironment, StaticApplicationExecutor> get() {
- return new StaticExecutionRuntime();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
deleted file mode 100644
index 6827eef..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import org.apache.eagle.app.environment.AbstractEnvironment;
-import org.apache.eagle.app.environment.builder.ApplicationBuilder;
-import org.apache.eagle.app.environment.builder.MetricDescriptor;
-import org.apache.eagle.app.environment.builder.TransformFunction;
-import org.apache.eagle.app.environment.builder.TransformFunctionBolt;
-import org.apache.eagle.app.messaging.*;
-import com.typesafe.config.Config;
-
-/**
- * Storm Execution Environment Context.
- */
-public class StormEnvironment extends AbstractEnvironment {
- public StormEnvironment(Config envConfig) {
- super(envConfig);
- }
-
- // ----------------------------------
- // Classic Storm Topology Builder API
- // ----------------------------------
- public StormStreamSink getStreamSink(String streamId, Config config) {
- return ((StormStreamSink) stream().getSink(streamId,config));
- }
-
- public StormStreamSource getStreamSource(String streamId, Config config) {
- return (StormStreamSource) stream().getSource(streamId,config);
- }
-
- public MetricStreamPersist getMetricPersist(MetricDescriptor metricDescriptor, Config config) {
- return new MetricStreamPersist(metricDescriptor, config);
- }
-
- public EntityStreamPersist getEntityPersist(Config config) {
- return new EntityStreamPersist(config);
- }
-
- public MetricSchemaGenerator getMetricSchemaGenerator(MetricDescriptor metricDescriptor, Config config) {
- return new MetricSchemaGenerator(metricDescriptor, config);
- }
-
- public TransformFunctionBolt getTransformer(TransformFunction function) {
- return new TransformFunctionBolt(function);
- }
-
- // ----------------------------------
- // Fluent Storm App Builder API
- // ----------------------------------
-
- public ApplicationBuilder newApp(Config appConfig) {
- return new ApplicationBuilder(appConfig, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
deleted file mode 100644
index 2b4180d..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.*;
-import backtype.storm.utils.NimbusClient;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.ConfigRenderOptions;
-import org.apache.eagle.alert.engine.runner.StormMetricTaggedConsumer;
-import org.apache.eagle.alert.metric.MetricConfigs;
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.environment.ExecutionRuntime;
-import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
-import org.apache.eagle.app.utils.DynamicJarPathFinder;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.thrift7.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Int;
-import storm.trident.spout.RichSpoutBatchExecutor;
-
-import java.util.List;
-import java.util.Objects;
-
-public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
- private static final Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
- private static LocalCluster _localCluster;
-
- private StormEnvironment environment;
- private KillOptions killOptions;
-
- private static LocalCluster getLocalCluster() {
- if (_localCluster == null) {
- _localCluster = new LocalCluster();
- }
- return _localCluster;
- }
-
- public StormExecutionRuntime() {
- this.killOptions = new KillOptions();
- this.killOptions.set_wait_secs(0);
- }
-
- @Override
- public void prepare(StormEnvironment environment) {
- this.environment = environment;
- }
-
- @Override
- public StormEnvironment environment() {
- return this.environment;
- }
-
- public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";
-
- private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
- private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
- private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
- private static final String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
-
- private static final String WORKERS = "workers";
-
- private backtype.storm.Config getStormConfig(com.typesafe.config.Config config) {
- backtype.storm.Config conf = new backtype.storm.Config();
- conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
- conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
- conf.put(backtype.storm.Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32));
- conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384));
- conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
- conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
- String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
- if (environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
- nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
- LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
- } else {
- LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
- }
- Integer nimbusThriftPort = STORM_NIMBUS_THRIFT_DEFAULT;
- if (environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
- nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
- LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
- } else {
- LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
- }
- conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost);
- conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
- conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "backtype.storm.security.auth.SimpleTransportPlugin");
- if (config.hasPath(WORKERS)) {
- conf.setNumWorkers(config.getInt(WORKERS));
- }
-
- if (config.hasPath(TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
- conf.put(TOPOLOGY_MESSAGE_TIMEOUT_SECS, config.getInt(TOPOLOGY_MESSAGE_TIMEOUT_SECS));
- }
-
- if (config.hasPath(MetricConfigs.METRIC_SINK_CONF)) {
- conf.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()), 1);
- }
- return conf;
- }
-
- @Override
- public void start(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
- String topologyName = config.getString("appId");
- Preconditions.checkNotNull(topologyName,"[appId] is required by null for " + executor.getClass().getCanonicalName());
- StormTopology topology = executor.execute(config, environment);
- LOG.info("Starting {} ({}), mode: {}",topologyName, executor.getClass().getCanonicalName(), config.getString("mode"));
- Config conf = getStormConfig(config);
- if (ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))) {
- String jarFile = config.hasPath("jarPath") ? config.getString("jarPath") : null;
- if (jarFile == null) {
- jarFile = DynamicJarPathFinder.findPath(executor.getClass());
- }
- synchronized (StormExecutionRuntime.class) {
- System.setProperty("storm.jar", jarFile);
- LOG.info("Submitting as cluster mode ...");
- try {
- StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
- } catch (AlreadyAliveException | InvalidTopologyException e) {
- LOG.error(e.getMessage(), e);
- throw new RuntimeException(e.getMessage(),e);
- } finally {
- System.clearProperty("storm.jar");
- }
- }
- } else {
- LOG.info("Submitting as local mode ...");
- getLocalCluster().submitTopology(topologyName, conf, topology);
- LOG.info("Submitted");
- }
- LOG.info("Started {} ({})",topologyName,executor.getClass().getCanonicalName());
- }
-
- @Override
- public void stop(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
- String appId = config.getString("appId");
- LOG.info("Stopping topology {} ...", appId);
- if (Objects.equals(config.getString("mode"), ApplicationEntity.Mode.CLUSTER.name())) {
- Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig(config)).getClient();
- try {
- stormClient.killTopologyWithOpts(appId, this.killOptions);
- } catch (NotAliveException | TException e) {
- LOG.error("Failed to kill topology named {}, due to: {}",appId,e.getMessage(),e.getCause());
- throw new RuntimeException(e.getMessage(),e);
- }
- } else {
- getLocalCluster().killTopologyWithOpts(appId, this.killOptions);
- }
- LOG.info("Stopped topology {}", appId);
- }
-
- @Override
- public ApplicationEntity.Status status(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
- String appId = config.getString("appId");
- LOG.info("Fetching {} status", appId);
- List<TopologySummary> topologySummaries ;
- ApplicationEntity.Status status = null;
- try {
- if (Objects.equals(config.getString("mode"), ApplicationEntity.Mode.CLUSTER.name())) {
- Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig(config)).getClient();
- topologySummaries = stormClient.getClusterInfo().get_topologies();
- } else {
- topologySummaries = getLocalCluster().getClusterInfo().get_topologies();
- }
-
- for (TopologySummary topologySummary : topologySummaries) {
- if (topologySummary.get_name().equalsIgnoreCase(appId)) {
- if (topologySummary.get_status().equalsIgnoreCase("ACTIVE")) {
- status = ApplicationEntity.Status.RUNNING;
- } else if (topologySummary.get_status().equalsIgnoreCase("INACTIVE")) {
- status = ApplicationEntity.Status.STOPPED;
- } else if (topologySummary.get_status().equalsIgnoreCase("KILLED")) {
- status = ApplicationEntity.Status.STOPPING;
- } else {
- LOG.error("Unknown storm topology ({}) status: {}", topologySummary.get_status(),topologySummary.get_status());
- status = ApplicationEntity.Status.UNKNOWN;
- }
- }
- }
- //If not exist, return removed
- if (status == null) {
- status = ApplicationEntity.Status.REMOVED;
- }
- } catch (TException e) {
- LOG.error("Got error to fetch status of {}", appId, e);
- status = ApplicationEntity.Status.UNKNOWN;
- }
- LOG.info("{} status is {}", appId, status);
- return status;
- }
-
- public static class Provider implements ExecutionRuntimeProvider<StormEnvironment,StormTopology> {
- @Override
- public StormExecutionRuntime get() {
- return new StormExecutionRuntime();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java
deleted file mode 100644
index 76d6e1b..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.utils.BufferFileInputStream;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-import backtype.storm.Config;
-
-/**
- * copy from storm StormSubmitter
- * just rewrite StormSubmitter that does not support summit other jars once submittedJar is set.
- * Our implementation will not add this restrict.
- * Use this class to submit topologies to run on the Storm cluster. You should run your program
- * with the "storm jar" command from the command-line, and then use this class to
- * submit your topologies.
- */
-
-public class StormSubmitter {
- public static Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
-
- private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;
-
- private static Nimbus.Iface localNimbus = null;
-
- public static void setLocalNimbus(Nimbus.Iface localNimbusHandler) {
- StormSubmitter.localNimbus = localNimbusHandler;
- }
-
- /**
- * Submits a topology to run on the cluster. A topology runs forever or until
- * explicitly killed.
- *
- *
- * @param name the name of the storm.
- * @param stormConf the topology-specific configuration. See {@link Config}.
- * @param topology the processing to execute.
- * @throws AlreadyAliveException if a topology with this name is already running
- * @throws InvalidTopologyException if an invalid topology was submitted
- */
- public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
- submitTopology(name, stormConf, topology, null, null);
- }
-
- /**
- * Submits a topology to run on the cluster. A topology runs forever or until
- * explicitly killed.
- *
- * @param name the name of the storm.
- * @param stormConf the topology-specific configuration. See {@link Config}.
- * @param topology the processing to execute.
- * @param opts to manipulate the starting of the topology.
- * @throws AlreadyAliveException if a topology with this name is already running
- * @throws InvalidTopologyException if an invalid topology was submitted
- */
- public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts)
- throws AlreadyAliveException, InvalidTopologyException {
- submitTopology(name, stormConf, topology, opts, null);
- }
-
- /**
- * Submits a topology to run on the cluster. A topology runs forever or until
- * explicitly killed.
- *
- *
- * @param name the name of the storm.
- * @param stormConf the topology-specific configuration. See {@link Config}.
- * @param topology the processing to execute.
- * @param opts to manipulate the starting of the topology
- * @param progressListener to track the progress of the jar upload process
- * @throws AlreadyAliveException if a topology with this name is already running
- * @throws InvalidTopologyException if an invalid topology was submitted
- */
- public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
- ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException {
- if (!Utils.isValidConf(stormConf)) {
- throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
- }
- stormConf = new HashMap(stormConf);
- stormConf.putAll(Utils.readCommandLineOpts());
- Map conf = Utils.readStormConfig();
- conf.putAll(stormConf);
- try {
- String serConf = JSONValue.toJSONString(stormConf);
- if (localNimbus != null) {
- LOG.info("Submitting topology " + name + " in local mode");
- localNimbus.submitTopology(name, null, serConf, topology);
- } else {
- NimbusClient client = NimbusClient.getConfiguredClient(conf);
- if (topologyNameExists(conf, name)) {
- throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
- }
- submitJar(conf, progressListener);
- try {
- LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
- if (opts != null) {
- client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
- } else {
- // this is for backwards compatibility
- client.getClient().submitTopology(name, submittedJar, serConf, topology);
- }
- } catch (InvalidTopologyException e) {
- LOG.warn("Topology submission exception: " + e.get_msg());
- throw e;
- } catch (AlreadyAliveException e) {
- LOG.warn("Topology already alive exception", e);
- throw e;
- } finally {
- client.close();
- }
- }
- LOG.info("Finished submitting topology: " + name);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
- * explicitly killed.
- *
- *
- * @param name the name of the storm.
- * @param stormConf the topology-specific configuration. See {@link Config}.
- * @param topology the processing to execute.
- * @throws AlreadyAliveException if a topology with this name is already running
- * @throws InvalidTopologyException if an invalid topology was submitted
- */
-
- public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
- submitTopologyWithProgressBar(name, stormConf, topology, null);
- }
-
- /**
- * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
- * explicitly killed.
- *
- *
- * @param name the name of the storm.
- * @param stormConf the topology-specific configuration. See {@link Config}.
- * @param topology the processing to execute.
- * @param opts to manipulate the starting of the topology
- * @throws AlreadyAliveException if a topology with this name is already running
- * @throws InvalidTopologyException if an invalid topology was submitted
- */
-
- public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
- // show a progress bar so we know we're not stuck (especially on slow connections)
- submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
- @Override
- public void onStart(String srcFile, String targetFile, long totalBytes) {
- System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
- }
-
- @Override
- public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) {
- int length = 50;
- int p = (int)((length * bytesUploaded) / totalBytes);
- String progress = StringUtils.repeat("=", p);
- String todo = StringUtils.repeat(" ", length - p);
-
- System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
- }
-
- @Override
- public void onCompleted(String srcFile, String targetFile, long totalBytes) {
- System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
- }
- });
- }
-
- private static boolean topologyNameExists(Map conf, String name) {
- NimbusClient client = NimbusClient.getConfiguredClient(conf);
- try {
- ClusterSummary summary = client.getClient().getClusterInfo();
- for (TopologySummary s : summary.get_topologies()) {
- if (s.get_name().equals(name)) {
- return true;
- }
- }
- return false;
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- client.close();
- }
- }
-
- private static String submittedJar = null;
-
- private static void submitJar(Map conf, ProgressListener listener) {
- LOG.info("before uploaded, submittedJar = {}", submittedJar);
- String localJar = System.getProperty("storm.jar");
- submittedJar = submitJar(conf, localJar, listener);
- LOG.info("after uploaded, submittedJar = {}", submittedJar);
- }
-
- /**
- * Submit jar file
- * @param conf the topology-specific configuration. See {@link Config}.
- * @param localJar file path of the jar file to submit
- * @return the remote location of the submitted jar
- */
- public static String submitJar(Map conf, String localJar) {
- return submitJar(conf, localJar, null);
- }
-
- /**
- * Submit jar file
- * @param conf the topology-specific configuration. See {@link Config}.
- * @param localJar file path of the jar file to submit
- * @param listener progress listener to track the jar file upload
- * @return the remote location of the submitted jar
- */
- public static String submitJar(Map conf, String localJar, ProgressListener listener) {
- if (localJar == null) {
- throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
- }
-
- NimbusClient client = NimbusClient.getConfiguredClient(conf);
- try {
- String uploadLocation = client.getClient().beginFileUpload();
- LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
- BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);
-
- long totalSize = new File(localJar).length();
- if (listener != null) {
- listener.onStart(localJar, uploadLocation, totalSize);
- }
-
- long bytesUploaded = 0;
- while (true) {
- byte[] toSubmit = is.read();
- bytesUploaded += toSubmit.length;
- if (listener != null) {
- listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
- }
-
- if (toSubmit.length == 0) {
- break;
- }
- client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
- }
- client.getClient().finishFileUpload(uploadLocation);
-
- if (listener != null) {
- listener.onCompleted(localJar, uploadLocation, totalSize);
- }
-
- LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
- return uploadLocation;
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- client.close();
- }
- }
-
- /**
- * Interface use to track progress of file upload.
- */
- public interface ProgressListener {
- /**
- * called before file is uploaded.
- * @param srcFile - jar file to be uploaded
- * @param targetFile - destination file
- * @param totalBytes - total number of bytes of the file
- */
- public void onStart(String srcFile, String targetFile, long totalBytes);
-
- /**
- * called whenever a chunk of bytes is uploaded.
- * @param srcFile - jar file to be uploaded
- * @param targetFile - destination file
- * @param bytesUploaded - number of bytes transferred so far
- * @param totalBytes - total number of bytes of the file
- */
- public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes);
-
- /**
- * called when the file is uploaded.
- * @param srcFile - jar file to be uploaded
- * @param targetFile - destination file
- * @param totalBytes - total number of bytes of the file
- */
- public void onCompleted(String srcFile, String targetFile, long totalBytes);
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java
deleted file mode 100644
index d1cecc9..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/DefaultStreamSinkConfig.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-public class DefaultStreamSinkConfig implements StreamSinkConfig {
- private final Class<?> streamPersistClass;
- private static final String NONE_STORAGE_TYPE = "NONE";
-
- public DefaultStreamSinkConfig(Class<?> streamPersistClass) {
- this.streamPersistClass = streamPersistClass;
- }
-
- @Override
- public String getType() {
- return NONE_STORAGE_TYPE;
- }
-
- public Class<?> getSinkType() {
- return streamPersistClass;
- }
-
- @Override
- public Class<? extends StreamSinkConfig> getConfigType() {
- return DefaultStreamSinkConfig.class;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java
deleted file mode 100644
index e216dc6..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.app.messaging;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-public class EntityStreamPersist extends BaseRichBolt {
- private static final Logger LOG = LoggerFactory.getLogger(EntityStreamPersist.class);
-
- private final Config config;
- private IEagleServiceClient client;
- private OutputCollector collector;
- private int batchSize;
- private List<TaggedLogAPIEntity> entityBucket = new CopyOnWriteArrayList<>();
-
- public EntityStreamPersist(Config config) {
- this.config = config;
- this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1;
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.client = new EagleServiceClientImpl(config);
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- List<? extends TaggedLogAPIEntity> entities = (List<? extends TaggedLogAPIEntity>) input.getValue(0);
- entityBucket.addAll(entities);
-
- if (entityBucket.size() < batchSize) {
- return;
- }
-
- try {
- GenericServiceAPIResponseEntity response = client.create(entityBucket);
- if (response.isSuccess()) {
- LOG.info("persist {} entities with starttime={}", entityBucket.size(), entityBucket.get(0).getTimestamp());
- collector.ack(input);
- } else {
- LOG.error("Service side error: {}", response.getException());
- collector.reportError(new IllegalStateException(response.getException()));
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- collector.fail(input);
- }
- entityBucket.clear();
- }
-
- @Override
- public void cleanup() {
- try {
- this.client.getJerseyClient().destroy();
- this.client.close();
- } catch (IOException e) {
- LOG.error("Close client error: {}", e.getMessage(), e);
- } finally {
- super.cleanup();
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java
deleted file mode 100644
index c8fe1b5..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/FlattenEventMapper.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import backtype.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.List;
-
-public class FlattenEventMapper implements StreamEventMapper {
- private final String streamId;
- private static final String TIMESTAMP_FIELD = "timestamp";
- private static final Logger LOGGER = LoggerFactory.getLogger(FlattenEventMapper.class);
-
- public FlattenEventMapper(String streamId) {
- this.streamId = streamId;
- }
-
- @Override
- public List<StreamEvent> map(Tuple tuple) throws Exception {
- long timestamp;
- if (tuple.getFields().contains(TIMESTAMP_FIELD)) {
- try {
- timestamp = tuple.getLongByField("timestamp");
- } catch (Exception ex) {
- // if timestamp is not null
- LOGGER.error(ex.getMessage(), ex);
- timestamp = 0;
- }
- } else {
- timestamp = System.currentTimeMillis();
- }
- Object[] values = new Object[tuple.getFields().size()];
- for (int i = 0; i < tuple.getFields().size(); i++) {
- values[i] = tuple.getValue(i);
- }
- StreamEvent event = new StreamEvent();
- event.setTimestamp(timestamp);
- event.setStreamId(streamId);
- event.setData(values);
- return Collections.singletonList(event);
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java
deleted file mode 100644
index 987ed0b..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/JsonSchema.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.messaging;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * General Json Schema.
- * Different from org.apache.eagle.alert.engine.scheme.JsonScheme which is just to multi-topic cases.
- *
- * @see org.apache.eagle.alert.engine.scheme.JsonScheme
- */
-public class JsonSchema implements Scheme {
- private static final long serialVersionUID = -8352896475656975577L;
- private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(JsonSchema.class);
- private static final ObjectMapper mapper = new ObjectMapper();
-
- @Override
- public Fields getOutputFields() {
- return new Fields("f1","f2");
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public List<Object> deserialize(byte[] ser) {
- try {
- if (ser != null) {
- Map map = mapper.readValue(ser, Map.class);
- return Arrays.asList(map.hashCode(), map);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Content is null, ignore");
- }
- }
- } catch (IOException e) {
- try {
- LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e);
- } catch (Exception ex) {
- LOG.error(ex.getMessage(), ex);
- }
- }
- return null;
- }
-}
\ No newline at end of file