You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:17 UTC
[09/84] [partial] eagle git commit: Clean repo for eagle site
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
deleted file mode 100644
index 2af49fb..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.impl;
-
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.resource.Models;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
-/**
- * In memory service for simple service start. Make all service API as
- * synchronized.
- *
- * @since Apr 11, 2016
- */
-public class InMemMetadataDaoImpl implements IMetadataDao {
-
- private static final Logger LOG = LoggerFactory.getLogger(InMemMetadataDaoImpl.class);
-
- private List<StreamingCluster> clusters = new ArrayList<StreamingCluster>();
- private List<StreamDefinition> schemas = new ArrayList<StreamDefinition>();
- private List<Kafka2TupleMetadata> datasources = new ArrayList<Kafka2TupleMetadata>();
- private List<PolicyDefinition> policies = new ArrayList<PolicyDefinition>();
- private List<Publishment> publishments = new ArrayList<Publishment>();
- private List<PublishmentType> publishmentTypes = new ArrayList<PublishmentType>();
- private volatile int maxScheduleState = 100;
- private SortedMap<String, ScheduleState> scheduleStates = new TreeMap<String, ScheduleState>();
- private List<PolicyAssignment> assignments = new ArrayList<PolicyAssignment>();
- private List<Topology> topologies = new ArrayList<Topology>();
- private List<AlertPublishEvent> alerts = new ArrayList<>();
-
- @Inject
- public InMemMetadataDaoImpl(Config config) {
- }
-
- @Override
- public synchronized List<StreamingCluster> listClusters() {
- return clusters;
- }
-
- @Override
- public OpResult addCluster(final StreamingCluster cluster) {
- return addOrReplace(clusters, cluster);
- }
-
- private synchronized <T> OpResult addOrReplace(List<T> clusters, T paramT) {
- Optional<T> scOp = clusters.stream().filter(new Predicate<T>() {
- @Override
- public boolean test(T t) {
- if (MetadataUtils.getKey(t).equalsIgnoreCase(MetadataUtils.getKey(paramT))) {
- return true;
- }
- return false;
- }
- }).findFirst();
-
- OpResult result = new OpResult();
- // replace
- if (scOp.isPresent()) {
- clusters.remove(scOp.get());
- result.message = "replace the old one!";
- } else {
- result.message = "created new config!";
- }
- result.code = 200;
- clusters.add(paramT);
- return result;
- }
-
- @SuppressWarnings("unchecked")
- private synchronized <T> OpResult remove(List<T> clusters, String id) {
- T[] matched = (T[]) clusters.stream().filter(new Predicate<T>() {
-
- @Override
- public boolean test(T t) {
- if (MetadataUtils.getKey(t).equalsIgnoreCase(id)) {
- return true;
- }
- return false;
- }
- }).toArray();
-
- OpResult result = new OpResult();
- result.code = 200;
- if (clusters.removeAll(Arrays.asList(matched))) {
- result.message = "removed configuration item succeed";
- } else {
- result.message = "no configuration item removed";
- }
- return result;
- }
-
- @Override
- public OpResult removeCluster(final String clusterId) {
- return remove(clusters, clusterId);
- }
-
- @Override
- public synchronized List<StreamDefinition> listStreams() {
- return schemas;
- }
-
- @Override
- public OpResult createStream(StreamDefinition stream) {
- return addOrReplace(schemas, stream);
- }
-
- @Override
- public OpResult removeStream(String streamId) {
- return remove(schemas, streamId);
- }
-
- @Override
- public synchronized List<Kafka2TupleMetadata> listDataSources() {
- return datasources;
- }
-
- @Override
- public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
- return addOrReplace(datasources, dataSource);
- }
-
- @Override
- public OpResult removeDataSource(String datasourceId) {
- return remove(datasources, datasourceId);
- }
-
- @Override
- public synchronized List<PolicyDefinition> listPolicies() {
- return policies;
- }
-
- @Override
- public OpResult addPolicy(PolicyDefinition policy) {
- return addOrReplace(policies, policy);
- }
-
- @Override
- public OpResult removePolicy(String policyId) {
- return remove(policies, policyId);
- }
-
- @Override
- public synchronized List<Publishment> listPublishment() {
- return publishments;
- }
-
- @Override
- public OpResult addPublishment(Publishment publishment) {
- return addOrReplace(publishments, publishment);
- }
-
- @Override
- public OpResult removePublishment(String pubId) {
- return remove(publishments, pubId);
- }
-
- @Override
- public List<PublishmentType> listPublishmentType() {
- return publishmentTypes;
- }
-
- @Override
- public OpResult addPublishmentType(PublishmentType publishmentType) {
- return addOrReplace(publishmentTypes, publishmentType);
- }
-
- @Override
- public OpResult removePublishmentType(String pubType) {
- return remove(publishmentTypes, pubType);
- }
-
- @Override
- public List<AlertPublishEvent> listAlertPublishEvent(int size) {
- if (size > 0 && size <= alerts.size()) {
- return alerts.subList(alerts.size() - size, alerts.size());
- }
- return alerts;
- }
-
- @Override
- public AlertPublishEvent getAlertPublishEvent(String alertId) {
- Optional<AlertPublishEvent> op = alerts.stream().filter(alert -> alert.getAlertId().equals(alertId)).findAny();
- if (op.isPresent()) {
- return op.get();
- } else {
- return null;
- }
- }
-
- @Override
- public List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size) {
- List<AlertPublishEvent> result = alerts.stream().filter(alert -> alert.getPolicyId().equals(policyId)).collect(Collectors.toList());
- if (size < 0 || size > result.size()) {
- size = result.size();
- }
- return result.subList(result.size() - size, result.size());
- }
-
- @Override
- public OpResult addAlertPublishEvent(AlertPublishEvent event) {
- alerts.add(event);
- OpResult result = new OpResult();
- result.code = 200;
- return result;
- }
-
-
- @Override
- public synchronized OpResult addScheduleState(ScheduleState state) {
- // FIXME : might concurrent issue
- String toRemove = null;
- if (scheduleStates.size() > maxScheduleState) {
- toRemove = scheduleStates.firstKey();
- }
- scheduleStates.put(state.getVersion(), state);
- if (toRemove != null) {
- scheduleStates.remove(toRemove);
- }
-
- OpResult result = new OpResult();
- result.code = 200;
- result.message = "OK";
- return result;
- }
-
- @Override
- public synchronized ScheduleState getScheduleState() {
- if (scheduleStates.size() > 0) {
- return scheduleStates.get(scheduleStates.lastKey());
- }
- return null;
- }
-
- @Override
- public ScheduleState getScheduleState(String versionId) {
- return scheduleStates.get(versionId);
- }
-
- @Override
- public List<ScheduleState> listScheduleStates() {
- throw new UnsupportedOperationException("listScheduleStates not support!");
- }
-
- @Override
- public OpResult clearScheduleState(int maxCapacity) {
- throw new UnsupportedOperationException("clearScheduleState not support!");
- }
-
- @Override
- public List<PolicyAssignment> listAssignments() {
- return assignments;
- }
-
- @Override
- public OpResult addAssignment(PolicyAssignment assignment) {
- OpResult result = new OpResult();
- result.code = 200;
- result.message = "OK";
- assignments.add(assignment);
- return result;
- }
-
- @Override
- public List<Topology> listTopologies() {
- return topologies;
- }
-
- @Override
- public OpResult addTopology(Topology t) {
- return addOrReplace(topologies, t);
- }
-
- @Override
- public OpResult removeTopology(String topologyName) {
- return remove(topologies, topologyName);
- }
-
- @Override
- public synchronized OpResult clear() {
- LOG.info("clear models...");
- this.assignments.clear();
- this.clusters.clear();
- this.datasources.clear();
- this.policies.clear();
- this.publishments.clear();
- this.scheduleStates.clear();
- this.schemas.clear();
- this.topologies.clear();
- OpResult result = new OpResult();
- result.code = 200;
- result.message = "OK";
- return result;
- }
-
- @Override
- public Models export() {
- Models models = new Models();
- models.assignments.addAll(this.assignments);
- models.clusters.addAll(this.clusters);
- models.datasources.addAll(this.datasources);
- models.policies.addAll(this.policies);
- models.publishments.addAll(this.publishments);
- models.scheduleStates.putAll(this.scheduleStates);
- models.schemas.addAll(this.schemas);
- models.topologies.addAll(this.topologies);
- return models;
- }
-
- @Override
- public OpResult importModels(Models models) {
- LOG.info("clear and import models...");
- clear();
- this.assignments.addAll(models.assignments);
- this.clusters.addAll(models.clusters);
- this.datasources.addAll(models.datasources);
- this.policies.addAll(models.policies);
- this.publishments.addAll(models.publishments);
- this.scheduleStates.putAll(models.scheduleStates);
- this.schemas.addAll(models.schemas);
- this.topologies.addAll(models.topologies);
- OpResult result = new OpResult();
- result.code = 200;
- result.message = "OK";
- return result;
- }
-
- @Override
- public void close() throws IOException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
deleted file mode 100644
index e0b5c9d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.impl;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.resource.Models;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @since May 26, 2016.
- */
-public class JdbcMetadataDaoImpl implements IMetadataDao {
- private static final Logger LOG = LoggerFactory.getLogger(JdbcMetadataDaoImpl.class);
- private JdbcMetadataHandler handler;
-
- @Inject
- public JdbcMetadataDaoImpl(Config config) {
- handler = new JdbcMetadataHandler(config.getConfig(MetadataUtils.META_DATA));
- }
-
- @Override
- public List<Topology> listTopologies() {
- return handler.list(Topology.class);
- }
-
- @Override
- public List<StreamingCluster> listClusters() {
- return handler.list(StreamingCluster.class);
- }
-
- @Override
- public List<StreamDefinition> listStreams() {
- return handler.list(StreamDefinition.class);
- }
-
- @Override
- public List<Kafka2TupleMetadata> listDataSources() {
- return handler.list(Kafka2TupleMetadata.class);
- }
-
- @Override
- public List<PolicyDefinition> listPolicies() {
- return handler.list(PolicyDefinition.class);
- }
-
- @Override
- public List<Publishment> listPublishment() {
- return handler.listPublishments();
- }
-
- @Override
- public List<AlertPublishEvent> listAlertPublishEvent(int size) {
- if (size <= 0) {
- LOG.info("Invalid parameter size <= 0");
- return new ArrayList<>();
- }
- return handler.listAlertEvents(null, null, size);
- }
-
- public PolicyDefinition getPolicyById(String policyId) {
- return handler.queryById(PolicyDefinition.class, policyId);
- }
-
- public List<Publishment> getPublishmentsByPolicyId(String policyId) {
- return handler.getPublishmentsByPolicyId(policyId);
- }
-
- @Override
- public AlertPublishEvent getAlertPublishEvent(String alertId) {
- return handler.getAlertEventById(alertId, 1);
- }
-
- @Override
- public List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size) {
- if (size <= 0) {
- LOG.info("Invalid parameter size <= 0");
- return new ArrayList<>();
- }
- return handler.getAlertEventByPolicyId(policyId, size);
- }
-
- @Override
- public ScheduleState getScheduleState(String versionId) {
- return handler.queryById(ScheduleState.class, versionId);
- }
-
- @Override
- public ScheduleState getScheduleState() {
- List<ScheduleState> scheduleStates =
- handler.list(ScheduleState.class, JdbcMetadataHandler.SortType.DESC);
- if (scheduleStates.isEmpty()) {
- return null;
- } else {
- return scheduleStates.get(0);
- }
- }
-
- @Override
- public List<ScheduleState> listScheduleStates() {
- return handler.list(ScheduleState.class);
- }
-
- @Override
- public List<PolicyAssignment> listAssignments() {
- return handler.list(PolicyAssignment.class);
- }
-
- @Override
- public List<PublishmentType> listPublishmentType() {
- return handler.list(PublishmentType.class);
- }
-
- @Override
- public OpResult addTopology(Topology t) {
- return handler.addOrReplace(Topology.class.getSimpleName(), t);
- }
-
- @Override
- public OpResult addCluster(StreamingCluster cluster) {
- return handler.addOrReplace(StreamingCluster.class.getSimpleName(), cluster);
- }
-
- @Override
- public OpResult addAlertPublishEvent(AlertPublishEvent event) {
- return handler.addAlertEvent(event);
- }
-
- @Override
- public OpResult createStream(StreamDefinition stream) {
- return handler.addOrReplace(StreamDefinition.class.getSimpleName(), stream);
- }
-
- @Override
- public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
- return handler.addOrReplace(Kafka2TupleMetadata.class.getSimpleName(), dataSource);
- }
-
- @Override
- public OpResult addPolicy(PolicyDefinition policy) {
- return handler.addOrReplace(PolicyDefinition.class.getSimpleName(), policy);
- }
-
- @Override
- public OpResult addPublishment(Publishment publishment) {
- return handler.addOrReplace(Publishment.class.getSimpleName(), publishment);
- }
-
- @Override
- public OpResult addPublishmentsToPolicy(String policyId, List<String> publishmentIds) {
- return handler.addPublishmentsToPolicy(policyId, publishmentIds);
- }
-
- @Override
- public OpResult addScheduleState(ScheduleState state) {
- return handler.addOrReplace(ScheduleState.class.getSimpleName(), state);
- }
-
- @Override
- public OpResult clearScheduleState(int maxCapacity) {
- if (maxCapacity <= 0) {
- maxCapacity = 10;
- }
- OpResult result = handler.removeScheduleStates(maxCapacity);
- LOG.info(result.message);
- return result;
- }
-
- @Override
- public OpResult addAssignment(PolicyAssignment assignment) {
- return handler.addOrReplace(PolicyAssignment.class.getSimpleName(), assignment);
- }
-
- @Override
- public OpResult addPublishmentType(PublishmentType publishmentType) {
- return handler.addOrReplace(PublishmentType.class.getSimpleName(), publishmentType);
- }
-
- @Override
- public OpResult removeTopology(String topologyName) {
- return handler.removeById(Topology.class.getSimpleName(), topologyName);
- }
-
- @Override
- public OpResult removeCluster(String clusterId) {
- return handler.removeById(StreamingCluster.class.getSimpleName(), clusterId);
- }
-
- @Override
- public OpResult removeStream(String streamId) {
- return handler.removeById(StreamDefinition.class.getSimpleName(), streamId);
- }
-
- @Override
- public OpResult removeDataSource(String datasourceId) {
- return handler.removeById(Kafka2TupleMetadata.class.getSimpleName(), datasourceId);
- }
-
- @Override
- public OpResult removePolicy(String policyId) {
- return handler.removeById(PolicyDefinition.class.getSimpleName(), policyId);
- }
-
- @Override
- public OpResult removePublishment(String pubId) {
- return handler.removeById(Publishment.class.getSimpleName(), pubId);
- }
-
- @Override
- public OpResult removePublishmentType(String name) {
- return handler.removeById(PublishmentType.class.getSimpleName(), name);
- }
-
- @Override
- public OpResult clear() {
- throw new UnsupportedOperationException("clear not support!");
- }
-
- @Override
- public Models export() {
- throw new UnsupportedOperationException("clear not support!");
- }
-
- @Override
- public OpResult importModels(Models models) {
- throw new UnsupportedOperationException("clear not support!");
- }
-
- @Override
- public void close() throws IOException {
- if (handler != null) {
- handler.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
deleted file mode 100644
index a9e3c5e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.metadata.impl;
-
-import org.apache.commons.collections.map.HashedMap;
-import org.apache.commons.dbcp.BasicDataSource;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.sql.DataSource;
-import java.io.IOException;
-import java.sql.*;
-import java.util.*;
-import java.util.function.Function;
-
-public class JdbcMetadataHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(JdbcMetadataHandler.class);
- // general model
- private static final String INSERT_STATEMENT = "INSERT INTO %s(content, id) VALUES (?, ?)";
- private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE id=?";
- private static final String UPDATE_STATEMENT = "UPDATE %s set content=? WHERE id=?";
- private static final String QUERY_ALL_STATEMENT = "SELECT content FROM %s";
- private static final String QUERY_CONDITION_STATEMENT = "SELECT content FROM %s WHERE id=?";
- private static final String QUERY_ORDERBY_STATEMENT = "SELECT content FROM %s ORDER BY id %s";
-
- // customized model
- private static final String CLEAR_SCHEDULESTATES_STATEMENT = "DELETE FROM schedule_state WHERE id NOT IN (SELECT id from (SELECT id FROM schedule_state ORDER BY id DESC limit ?) as states)";
- private static final String INSERT_ALERT_STATEMENT = "INSERT INTO alert_event(alertId, siteId, appIds, policyId, alertTimestamp, policyValue, alertData) VALUES (?, ?, ?, ?, ?, ?, ?)";
- private static final String QUERY_ALERT_STATEMENT = "SELECT * FROM alert_event order by alertTimestamp DESC limit ?";
- private static final String QUERY_ALERT_BY_ID_STATEMENT = "SELECT * FROM alert_event WHERE alertId=? order by alertTimestamp DESC limit ?";
- private static final String QUERY_ALERT_BY_POLICY_STATEMENT = "SELECT * FROM alert_event WHERE policyId=? order by alertTimestamp DESC limit ?";
- private static final String INSERT_POLICYPUBLISHMENT_STATEMENT = "INSERT INTO policy_publishment(policyId, publishmentName) VALUES (?, ?)";
- private static final String DELETE_PUBLISHMENT_STATEMENT = "DELETE FROM policy_publishment WHERE policyId=?";
- private static final String QUERY_PUBLISHMENT_BY_POLICY_STATEMENT = "SELECT content FROM publishment a INNER JOIN policy_publishment b ON a.id=b.publishmentName and b.policyId=?";
- private static final String QUERY_PUBLISHMENT_STATEMENT = "SELECT a.content, b.policyId FROM publishment a LEFT JOIN policy_publishment b ON a.id=b.publishmentName";
-
- public enum SortType { DESC, ASC }
-
- private static Map<String, String> tblNameMap = new HashMap<>();
-
- private static final ObjectMapper mapper = new ObjectMapper();
- private DataSource dataSource;
-
- static {
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- registerTableName(StreamingCluster.class.getSimpleName(), "stream_cluster");
- registerTableName(StreamDefinition.class.getSimpleName(), "stream_definition");
- registerTableName(Kafka2TupleMetadata.class.getSimpleName(), "kafka_tuple_metadata");
- registerTableName(PolicyDefinition.class.getSimpleName(), "policy_definition");
- registerTableName(Publishment.class.getSimpleName(), "publishment");
- registerTableName(PublishmentType.class.getSimpleName(), "publishment_type");
- registerTableName(ScheduleState.class.getSimpleName(), "schedule_state");
- registerTableName(PolicyAssignment.class.getSimpleName(), "policy_assignment");
- registerTableName(Topology.class.getSimpleName(), "topology");
- registerTableName(AlertPublishEvent.class.getSimpleName(), "alert_event");
- }
-
- private static void registerTableName(String clzName, String tblName) {
- tblNameMap.put(clzName, tblName);
- }
-
- public JdbcMetadataHandler(Config config) {
- try {
- //JdbcSchemaManager.getInstance().init(config);
- BasicDataSource bDatasource = new BasicDataSource();
- bDatasource.setDriverClassName(config.getString(MetadataUtils.JDBC_DRIVER_PATH));
- if (config.hasPath(MetadataUtils.JDBC_USERNAME_PATH)) {
- bDatasource.setUsername(config.getString(MetadataUtils.JDBC_USERNAME_PATH));
- bDatasource.setPassword(config.getString(MetadataUtils.JDBC_PASSWORD_PATH));
- }
- bDatasource.setUrl(config.getString(MetadataUtils.JDBC_CONNECTION_PATH));
- if (config.hasPath(MetadataUtils.JDBC_CONNECTION_PROPERTIES_PATH)) {
- bDatasource.setConnectionProperties(config.getString(MetadataUtils.JDBC_CONNECTION_PROPERTIES_PATH));
- }
- this.dataSource = bDatasource;
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- private String getTableName(String clzName) {
- String tbl = tblNameMap.get(clzName);
- if (tbl != null) {
- return tbl;
- } else {
- return clzName;
- }
- }
-
- private void closeResource(ResultSet rs, PreparedStatement statement, Connection connection) {
- if (rs != null) {
- try {
- rs.close();
- } catch (SQLException e) {
- LOG.info(e.getMessage(), e);
- }
- }
- if (statement != null) {
- try {
- statement.close();
- } catch (SQLException e) {
- LOG.info("Failed to close statement: {}", e.getMessage(), e);
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.error("Failed to close connection: {}", e.getMessage(), e.getCause());
- }
- }
- }
-
- private OpResult executeUpdate(Connection connection, String query, String key, String value) throws SQLException {
- OpResult result = new OpResult();
- PreparedStatement statement = null;
- try {
- statement = connection.prepareStatement(query);
- Clob clob = connection.createClob();
- clob.setString(1, value);
- statement.setClob(1, clob);
- statement.setString(2, key);
- int status = statement.executeUpdate();
- LOG.info("update {} with query={}", status, query);
- } finally {
- if (statement != null) {
- statement.close();
- }
- }
- return result;
- }
-
-
- public <T> OpResult addOrReplace(String clzName, T t) {
- String tb = getTableName(clzName);
- OpResult result = new OpResult();
- Savepoint savepoint = null;
- String key = null;
- String value = null;
- Connection connection = null;
- try {
- connection = dataSource.getConnection();
- key = MetadataUtils.getKey(t);
- value = mapper.writeValueAsString(t);
- connection.setAutoCommit(false);
- savepoint = connection.setSavepoint("insertEntity");
- result = executeUpdate(connection, String.format(INSERT_STATEMENT, tb), key, value);
- connection.commit();
- } catch (SQLException e) {
- LOG.warn("fail to insert entity due to {}, and try to updated instead", e.getMessage());
- if (connection != null) {
- LOG.info("Detected duplicated entity");
- try {
- connection.rollback(savepoint);
- executeUpdate(connection, String.format(UPDATE_STATEMENT, tb), key, value);
- connection.commit();
- connection.setAutoCommit(true);
- } catch (SQLException e1) {
- LOG.warn("Rollback failed", e1);
- }
- }
- } catch (JsonProcessingException e) {
- LOG.error("Got JsonProcessingException: {}", e.getMessage(), e.getCause());
- result.code = OpResult.FAILURE;
- result.message = e.getMessage();
- } finally {
- closeResource(null, null, connection);
- }
- return result;
- }
-
-
- public <T> List<T> list(Class<T> clz) {
- return list(clz, null);
- }
-
- public <T> List<T> list(Class<T> clz, SortType sortType) {
- List<T> result = new LinkedList<T>();
- Connection connection = null;
- PreparedStatement statement = null;
- try {
- String tb = getTableName(clz.getSimpleName());
- String query = String.format(QUERY_ALL_STATEMENT, tb);
- if (sortType != null) {
- query = String.format(QUERY_ORDERBY_STATEMENT, tb, sortType.toString());
- }
- connection = dataSource.getConnection();
- statement = connection.prepareStatement(query);
- return executeList(statement, clz);
- } catch (SQLException ex) {
- LOG.error(ex.getMessage(), ex);
- } finally {
- closeResource(null, statement, connection);
- }
- return result;
- }
-
- private <T> List<T> executeList(PreparedStatement statement, Class<T> clz) throws SQLException {
- List<T> result = new LinkedList<>();
- ResultSet rs = null;
- try {
- rs = statement.executeQuery();
- while (rs.next()) {
- try {
- String content = rs.getString(1);
- result.add(mapper.readValue(content, clz)) ;
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- } finally {
- if (rs != null) {
- rs.close();
- }
- }
- return result;
- }
-
- private <T> List<T> executeList(PreparedStatement statement, Function<ResultSet, T> selectFun) throws SQLException {
- List<T> result = new LinkedList<>();
- ResultSet rs = null;
- try {
- rs = statement.executeQuery();
- while (rs.next()) {
- result.add(selectFun.apply(rs));
- }
- } finally {
- if (rs != null) {
- rs.close();
- }
- }
- return result;
- }
-
- public <T> T queryById(Class<T> clz, String id) {
- List<T> result = new LinkedList<T>();
- Connection connection = null;
- PreparedStatement statement = null;
- try {
- String tb = getTableName(clz.getSimpleName());
- connection = dataSource.getConnection();
- statement = connection.prepareStatement(String.format(QUERY_CONDITION_STATEMENT, tb));
- statement.setString(1, id);
- result = executeList(statement, clz);
- } catch (SQLException ex) {
- LOG.error(ex.getMessage(), ex);
- } finally {
- closeResource(null, statement, connection);
- }
- if (result.isEmpty()) {
- return null;
- } else {
- return result.get(0);
- }
- }
-
- public AlertPublishEvent getAlertEventById(String alertId, int size) {
- List<AlertPublishEvent> alerts = listAlertEvents(QUERY_ALERT_BY_ID_STATEMENT, alertId, size);
- if (alerts.isEmpty()) {
- return null;
- } else {
- return alerts.get(0);
- }
- }
-
- public List<AlertPublishEvent> getAlertEventByPolicyId(String policyId, int size) {
- return listAlertEvents(QUERY_ALERT_BY_POLICY_STATEMENT, policyId, size);
- }
-
- public List<AlertPublishEvent> listAlertEvents(String query, String filter, int size) {
- List<AlertPublishEvent> alerts = new LinkedList<>();
- Connection connection = null;
- PreparedStatement statement = null;
- try {
- connection = dataSource.getConnection();
- if (query == null) {
- query = QUERY_ALERT_STATEMENT;
- statement = connection.prepareStatement(query);
- statement.setInt(1, size);
- } else {
- statement = connection.prepareStatement(query);
- statement.setString(1, filter);
- statement.setInt(2, size);
- }
- alerts = executeList(statement, rs -> {
- try {
- AlertPublishEvent event = new AlertPublishEvent();
- event.setAlertId(rs.getString(1));
- event.setSiteId(rs.getString(2));
- event.setAppIds(mapper.readValue(rs.getString(3), List.class));
- event.setPolicyId(rs.getString(4));
- event.setAlertTimestamp(rs.getLong(5));
- event.setPolicyValue(rs.getString(6));
- event.setAlertData(mapper.readValue(rs.getString(7), Map.class));
- return event;
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- });
- } catch (SQLException ex) {
- LOG.error(ex.getMessage(), ex);
- } finally {
- closeResource(null, statement, connection);
- }
- return alerts;
- }
-
- public List<Publishment> listPublishments() {
- List<Publishment> result = new LinkedList<>();
- Connection connection = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- try {
- connection = dataSource.getConnection();
- statement = connection.prepareStatement(QUERY_PUBLISHMENT_STATEMENT);
- Map<String, List<String>> publishPolicyMap = new HashedMap();
- rs = statement.executeQuery();
- while (rs.next()) {
- String publishment = rs.getString(1);
- String policyId = rs.getString(2);
- List<String> policyIds = publishPolicyMap.get(publishment);
- if (policyIds == null) {
- policyIds = new ArrayList<>();
- publishPolicyMap.put(publishment, policyIds);
- }
- if (policyId != null) {
- policyIds.add(policyId);
- }
- }
- for (Map.Entry<String, List<String>> entry : publishPolicyMap.entrySet()) {
- Publishment publishment = mapper.readValue(entry.getKey(), Publishment.class);
- publishment.setPolicyIds(entry.getValue());
- result.add(publishment);
- }
- } catch (Exception ex) {
- LOG.error(ex.getMessage(), ex);
- } finally {
- closeResource(rs, statement, connection);
- }
- return result;
- }
-
- public List<Publishment> getPublishmentsByPolicyId(String policyId) {
- List<Publishment> result = new LinkedList<>();
- Connection connection = null;
- PreparedStatement statement = null;
- try {
- connection = dataSource.getConnection();
- statement = connection.prepareStatement(QUERY_PUBLISHMENT_BY_POLICY_STATEMENT);
- statement.setString(1, policyId);
- result = executeList(statement, Publishment.class);
- } catch (SQLException ex) {
- LOG.error(ex.getMessage(), ex);
- } finally {
- closeResource(null, statement, connection);
- }
- return result;
- }
-
- public OpResult addAlertEvent(AlertPublishEvent event) {
- Connection connection = null;
- PreparedStatement statement = null;
- OpResult result = new OpResult();
- try {
- connection = dataSource.getConnection();
- statement = connection.prepareStatement(INSERT_ALERT_STATEMENT);
- statement.setString(1, event.getAlertId());
- statement.setString(2, event.getSiteId());
- statement.setString(3, mapper.writeValueAsString(event.getAppIds()));
- statement.setString(4, event.getPolicyId());
- statement.setLong(5, event.getAlertTimestamp());
- statement.setString(6, event.getPolicyValue());
- statement.setString(7, mapper.writeValueAsString(event.getAlertData()));
- LOG.info("start to add alert event");
- int status = statement.executeUpdate();
- result.code = OpResult.SUCCESS;
- result.message = String.format("add %d records into alert_event successfully", status);
- } catch (Exception ex) {
- result.code = OpResult.FAILURE;
- result.message = ex.getMessage();
- } finally {
- closeResource(null, statement, connection);
- }
- LOG.info(result.message);
- return result;
- }
-
- public OpResult addPublishmentsToPolicy(String policyId, List<String> publishmentIds) {
- OpResult result = new OpResult();
- Connection connection = null;
- PreparedStatement statement = null;
- try {
- connection = dataSource.getConnection();
- connection.setAutoCommit(false);
- statement = connection.prepareStatement(DELETE_PUBLISHMENT_STATEMENT);
- statement.setString(1, policyId);
- int status = statement.executeUpdate();
- LOG.info("delete {} records from policy_publishment", status);
- closeResource(null, statement, null);
-
- statement = connection.prepareStatement(INSERT_POLICYPUBLISHMENT_STATEMENT);
- for (String pub : publishmentIds) {
- statement.setString(1, policyId);
- statement.setString(2, pub);
- statement.addBatch();
- }
- int[] num = statement.executeBatch();
- connection.commit();
- connection.setAutoCommit(true);
- int sum = 0;
- for (int i : num) {
- sum += i;
- }
- result.code = OpResult.SUCCESS;
- result.message = String.format("Add %d records into policy_publishment", sum);
- } catch (SQLException ex) {
- LOG.error("Error to add publishments to policy {}", policyId, ex);
- result.code = OpResult.FAILURE;
- result.message = ex.getMessage();
- } finally {
- closeResource(null, statement, connection);
- }
- LOG.info(result.message);
- return result;
- }
-
- public OpResult removeById(String clzName, String key) {
- Connection connection = null;
- PreparedStatement statement = null;
- OpResult result = new OpResult();
- try {
- String tb = getTableName(clzName);
- connection = dataSource.getConnection();
- statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb));
- statement.setString(1, key);
- LOG.info("start to delete records from {} with id={}", tb, key);
- int status = statement.executeUpdate();
- result.code = OpResult.SUCCESS;
- result.message = String.format("removed %d records from %s successfully", status, tb);
- } catch (SQLException ex) {
- result.code = OpResult.FAILURE;
- result.message = ex.getMessage();
- } finally {
- closeResource(null, statement, connection);
- }
- LOG.info(result.message);
- return result;
- }
-
- public void close() throws IOException {
- //JdbcSchemaManager.getInstance().shutdown();
- }
-
- public OpResult removeScheduleStates(int capacity) {
- Connection connection = null;
- PreparedStatement statement = null;
- OpResult result = new OpResult();
- try {
- connection = dataSource.getConnection();
- statement = connection.prepareStatement(CLEAR_SCHEDULESTATES_STATEMENT);
- statement.setInt(1, capacity);
- LOG.info("start to delete schedule states");
- int status = statement.executeUpdate();
- result.code = OpResult.SUCCESS;
- result.message = String.format("removed %d records from schedule_state successfully", status);
- } catch (SQLException ex) {
- result.code = OpResult.FAILURE;
- result.message = ex.getMessage();
- } finally {
- closeResource(null, statement, connection);
- }
- LOG.info(result.message);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
deleted file mode 100644
index a02c51e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.metadata.impl;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import com.typesafe.config.Config;
-import org.apache.ddlutils.Platform;
-import org.apache.ddlutils.PlatformFactory;
-import org.apache.ddlutils.model.Column;
-import org.apache.ddlutils.model.Database;
-import org.apache.ddlutils.model.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-@Deprecated
-public class JdbcSchemaManager {
-
- private static final Logger LOG = LoggerFactory.getLogger(JdbcSchemaManager.class);
- private Database database;
- private Platform platform;
-
- private static JdbcSchemaManager instance;
-
- public static Map<String, String> tblNameMap = new HashMap<>();
-
- private JdbcSchemaManager() {
- }
-
- private static void registerTableName(String clzName, String tblName) {
- tblNameMap.put(clzName, tblName);
- }
-
- static {
- registerTableName(StreamingCluster.class.getSimpleName(), "cluster");
- registerTableName(StreamDefinition.class.getSimpleName(), "stream_schema");
- registerTableName(Kafka2TupleMetadata.class.getSimpleName(), "datasource");
- registerTableName(PolicyDefinition.class.getSimpleName(), "policy");
- registerTableName(Publishment.class.getSimpleName(), "publishment");
- registerTableName(PublishmentType.class.getSimpleName(), "publishment_type");
- registerTableName(ScheduleState.class.getSimpleName(), "schedule_state");
- registerTableName(PolicyAssignment.class.getSimpleName(), "assignment");
- registerTableName(Topology.class.getSimpleName(), "topology");
- registerTableName(AlertPublishEvent.class.getSimpleName(), "alert_event");
- }
-
- public static JdbcSchemaManager getInstance() {
- if (instance == null) {
- instance = new JdbcSchemaManager();
- }
- return instance;
- }
-
- public void init(Config config) {
- Connection connection = null;
- try {
- this.platform = PlatformFactory.createNewPlatformInstance("mysql");
-
- connection = MetadataUtils.getJdbcConnection(config);
- String dbName = config.getString(MetadataUtils.JDBC_DATABASE_PATH);
- this.database = platform.readModelFromDatabase(connection, dbName);
- LOG.info("Loaded " + database);
-
- Database _database = identifyNewTables();
- if (_database.getTableCount() > 0) {
- LOG.info("Creating {} new tables (totally {} tables)", _database.getTableCount(), database.getTableCount());
- this.platform.createTables(connection, _database, false, true);
- LOG.info("Created {} new tables: ", _database.getTableCount(), _database.getTables());
- } else {
- LOG.debug("All the {} tables have already been created, no new tables", database.getTableCount());
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new IllegalStateException(e);
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.warn(e.getMessage(), e);
- }
- }
- }
- }
-
- private Database identifyNewTables() {
- Database _database = new Database();
- _database.setName(database.getName());
- Collection<String> tableNames = tblNameMap.values();
- LOG.info("Initializing database and creating tables");
- for (String tableName : tableNames) {
- if (database.findTable(tableName) == null) {
- Table table = createTable(tableName);
- LOG.info("Creating {}", table.toVerboseString());
- _database.addTable(table);
- database.addTable(table);
- } else {
- LOG.debug("Table {} already exists", tableName);
- }
- }
- return _database;
- }
-
- public void shutdown() {
- this.platform.shutdownDatabase();
- }
-
- private Table createTable(String tableName) {
- Table table = new Table();
- table.setName(tableName);
- buildTable(table);
- return table;
- }
-
- private void buildTable(Table table) {
- Column id = new Column();
- id.setName("id");
- id.setPrimaryKey(true);
- id.setRequired(true);
- id.setTypeCode(Types.VARCHAR);
- id.setSize("50");
- table.addColumn(id);
-
- Column value = new Column();
- value.setName("value");
- value.setTypeCode(Types.CLOB);
- table.addColumn(value);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java
deleted file mode 100644
index 5082e50..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.impl;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Constructor;
-
-/**
- * @since Apr 12, 2016.
- */
-public class MetadataDaoFactory {
-
- private static final Logger LOG = LoggerFactory.getLogger(MetadataDaoFactory.class);
-
- private static final MetadataDaoFactory INSTANCE = new MetadataDaoFactory();
-
- private IMetadataDao dao;
-
- private MetadataDaoFactory() {
- Config config = ConfigFactory.load();
- if (!config.hasPath(MetadataUtils.META_DATA)) {
- LOG.warn("metadata is not configured, use in-memory store !!!");
- dao = new InMemMetadataDaoImpl(null);
- } else {
- Config metaDataConfig = config.getConfig(MetadataUtils.META_DATA);
- try {
- String clsName = metaDataConfig.getString(MetadataUtils.ALERT_META_DATA_DAO);
- Class<?> clz;
- clz = Thread.currentThread().getContextClassLoader().loadClass(clsName);
- if (IMetadataDao.class.isAssignableFrom(clz)) {
- Constructor<?> cotr = clz.getConstructor(Config.class);
- LOG.info("metadata.alertMetadataDao loaded: " + clsName);
- dao = (IMetadataDao) cotr.newInstance(metaDataConfig);
- } else {
- throw new Exception("metadata.metadataDao configuration need to be implementation of IMetadataDao! ");
- }
- } catch (Exception e) {
- LOG.error("error when initialize the dao, fall back to in memory mode!", e);
- dao = new InMemMetadataDaoImpl(metaDataConfig);
- }
- }
- }
-
- public static MetadataDaoFactory getInstance() {
- return INSTANCE;
- }
-
- public IMetadataDao getMetadataDao() {
- return dao;
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
deleted file mode 100644
index 2325f90..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
+++ /dev/null
@@ -1,753 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.VersionedPolicyDefinition;
-import org.apache.eagle.alert.coordination.model.VersionedStreamDefinition;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.ScheduleStateBase;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.resource.Models;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-import org.bson.BsonDocument;
-import org.bson.BsonInt32;
-import org.bson.BsonString;
-import org.bson.Document;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.inject.Inject;
-import com.mongodb.Block;
-import com.mongodb.Function;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.MongoIterable;
-import com.mongodb.client.model.CreateCollectionOptions;
-import com.mongodb.client.model.IndexOptions;
-import com.mongodb.client.model.UpdateOptions;
-import com.mongodb.client.result.DeleteResult;
-import com.mongodb.client.result.UpdateResult;
-import com.typesafe.config.Config;
-
-/**
- * @since Apr 11, 2016.
- */
-public class MongoMetadataDaoImpl implements IMetadataDao {
-
- private static final String DEFAULT_DB_NAME = "ump_alert_metadata";
- private static final Logger LOG = LoggerFactory.getLogger(MongoMetadataDaoImpl.class);
- private static final ObjectMapper mapper = new ObjectMapper();
- private static final int DEFAULT_CAPPED_MAX_SIZE = 500 * 1024 * 1024;
- private static final int DEFAULT_CAPPED_MAX_DOCUMENTS = 20000;
- private static final String MONGO_CAPPED_MAX_SIZE = "mongo.cappedMaxSize";
- private static final String MONGO_CAPPED_MAX_DOCUMENTS = "mongo.cappedMaxDocuments";
-
- static {
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- }
-
- private final String connection;
- private final String dbname;
- private final MongoClient client;
- private final int cappedMaxSize;
- private final int cappedMaxDocuments;
-
- private MongoDatabase db;
- private MongoCollection<Document> cluster;
- private MongoCollection<Document> schema;
- private MongoCollection<Document> datasource;
- private MongoCollection<Document> policy;
- private MongoCollection<Document> publishment;
- private MongoCollection<Document> publishmentType;
- private MongoCollection<Document> topologies;
- private MongoCollection<Document> alerts;
-
- // scheduleStates splits to several collections
- private MongoCollection<Document> scheduleStates;
- private MongoCollection<Document> spoutSpecs;
- private MongoCollection<Document> alertSpecs;
- private MongoCollection<Document> groupSpecs;
- private MongoCollection<Document> publishSpecs;
- private MongoCollection<Document> policySnapshots;
- private MongoCollection<Document> streamSnapshots;
- private MongoCollection<Document> monitoredStreams;
- private MongoCollection<Document> assignments;
-
- @Inject
- public MongoMetadataDaoImpl(Config config) {
- this.connection = config.getString(MetadataUtils.MONGO_CONNECTION_PATH);
- this.cappedMaxSize = config.hasPath(MONGO_CAPPED_MAX_SIZE) ? config.getInt(MONGO_CAPPED_MAX_SIZE) : DEFAULT_CAPPED_MAX_SIZE;
- this.cappedMaxDocuments = config.hasPath(MONGO_CAPPED_MAX_DOCUMENTS) ? config.getInt(MONGO_CAPPED_MAX_DOCUMENTS) : DEFAULT_CAPPED_MAX_DOCUMENTS;
- this.client = new MongoClient(new MongoClientURI(this.connection));
- this.dbname = config.hasPath(MetadataUtils.MONGO_DATABASE) ? config.getString(MetadataUtils.MONGO_DATABASE) : DEFAULT_DB_NAME;
- init();
- }
-
- private boolean isCollectionExists(String collectionName) {
- boolean result = false;
- MongoIterable<String> allCollections = db.listCollectionNames();
- for ( String collection : allCollections ) {
- if (collection.equals(collectionName)) {
- result = true;
- break;
- }
- }
-
- return result;
- }
-
- private MongoCollection<Document> getCollection(String collectionName) {
- // first check if collection exists, if not then create a new collection with cappedSize
- if (!isCollectionExists(collectionName)) {
- CreateCollectionOptions option = new CreateCollectionOptions();
- option.capped(true);
- option.maxDocuments(cappedMaxDocuments);
- option.sizeInBytes(cappedMaxSize);
- db.createCollection(collectionName, option);
- }
-
- return db.getCollection(collectionName);
-
- }
-
- private void init() {
- db = client.getDatabase(this.dbname);
- IndexOptions io = new IndexOptions().background(true).name("nameIndex");
- BsonDocument doc = new BsonDocument();
- doc.append("name", new BsonInt32(1));
- cluster = db.getCollection("clusters");
- cluster.createIndex(doc, io);
- {
- BsonDocument doc2 = new BsonDocument();
- doc2.append("streamId", new BsonInt32(1));
- schema = db.getCollection("schemas");
- schema.createIndex(doc2, io);
- }
- datasource = db.getCollection("datasources");
- datasource.createIndex(doc, io);
- policy = db.getCollection("policies");
- policy.createIndex(doc, io);
- publishment = db.getCollection("publishments");
- publishment.createIndex(doc, io);
- topologies = db.getCollection("topologies");
- topologies.createIndex(doc, io);
- publishmentType = db.getCollection("publishmentTypes");
- publishmentType.createIndex(doc, io);
-
- alerts = db.getCollection("alerts");
- {
- IndexOptions io1 = new IndexOptions().background(true).unique(true).name("alertIndex");
- BsonDocument doc1 = new BsonDocument();
- doc1.append("alertId", new BsonInt32(1));
- alerts.createIndex(doc1, io1);
- }
-
-
- // below is for schedule_specs and its splitted collections
- BsonDocument doc1 = new BsonDocument();
- IndexOptions io1 = new IndexOptions().background(true).name("versionIndex");
- doc1.append("version", new BsonInt32(1));
- scheduleStates = getCollection("schedule_specs");
- scheduleStates.createIndex(doc1, io1);
-
- spoutSpecs = getCollection("spoutSpecs");
- {
- IndexOptions ioInternal = new IndexOptions().background(true).name("topologyIdIndex");
- BsonDocument docInternal = new BsonDocument();
- docInternal.append("topologyId", new BsonInt32(1));
- spoutSpecs.createIndex(docInternal, ioInternal);
- }
-
- alertSpecs = getCollection("alertSpecs");
- {
- IndexOptions ioInternal = new IndexOptions().background(true).name("topologyNameIndex");
- BsonDocument docInternal = new BsonDocument();
- docInternal.append("topologyName", new BsonInt32(1));
- alertSpecs.createIndex(docInternal, ioInternal);
- }
-
- groupSpecs = getCollection("groupSpecs");
- groupSpecs.createIndex(doc1, io1);
-
- publishSpecs = getCollection("publishSpecs");
- publishSpecs.createIndex(doc1, io1);
-
- policySnapshots = getCollection("policySnapshots");
- policySnapshots.createIndex(doc1, io);
-
- streamSnapshots = getCollection("streamSnapshots");
- streamSnapshots.createIndex(doc1, io);
-
- monitoredStreams = getCollection("monitoredStreams");
- monitoredStreams.createIndex(doc1, io);
-
- assignments = getCollection("assignments");
- assignments.createIndex(doc1, io1);
- }
-
- @Override
- public List<StreamingCluster> listClusters() {
- return list(cluster, StreamingCluster.class);
- }
-
-
- private <T> OpResult addOrReplace(MongoCollection<Document> collection, T t) {
- BsonDocument filter = new BsonDocument();
- if (t instanceof StreamDefinition) {
- filter.append("streamId", new BsonString(MetadataUtils.getKey(t)));
- } else if (t instanceof AlertPublishEvent) {
- filter.append("alertId", new BsonString(MetadataUtils.getKey(t)));
- } else {
- filter.append("name", new BsonString(MetadataUtils.getKey(t)));
- }
-
- String json = "";
- OpResult result = new OpResult();
- try {
- json = mapper.writeValueAsString(t);
- UpdateOptions options = new UpdateOptions();
- options.upsert(true);
- UpdateResult ur = collection.replaceOne(filter, Document.parse(json), options);
- // FIXME: could based on matched count do better matching...
- if (ur.getModifiedCount() > 0 || ur.getUpsertedId() != null) {
- result.code = 200;
- result.message = String.format("update %d configuration item.", ur.getModifiedCount());
- } else {
- result.code = 500;
- result.message = "no configuration item create/updated.";
- }
- } catch (Exception e) {
- result.code = 500;
- result.message = e.getMessage();
- LOG.error("", e);
- }
- return result;
- }
-
- private <T> OpResult remove(MongoCollection<Document> collection, String name) {
- return removeObject(collection, "name", name);
- }
-
- private <T> OpResult removeObject(MongoCollection<Document> collection, String nameField, String name) {
- BsonDocument filter = new BsonDocument();
- filter.append(nameField, new BsonString(name));
- DeleteResult dr = collection.deleteOne(filter);
- OpResult result = new OpResult();
- result.code = 200;
- result.message = String.format(" %d config item removed!", dr.getDeletedCount());
- return result;
- }
-
- @Override
- public OpResult addCluster(StreamingCluster cluster) {
- return addOrReplace(this.cluster, cluster);
- }
-
- @Override
- public OpResult removeCluster(String clusterId) {
- return remove(cluster, clusterId);
- }
-
- @Override
- public List<StreamDefinition> listStreams() {
- return list(schema, StreamDefinition.class);
- }
-
- @Override
- public OpResult createStream(StreamDefinition stream) {
- return addOrReplace(this.schema, stream);
- }
-
- @Override
- public OpResult removeStream(String streamId) {
- return removeObject(schema, "streamId", streamId);
- }
-
- @Override
- public List<Kafka2TupleMetadata> listDataSources() {
- return list(datasource, Kafka2TupleMetadata.class);
- }
-
- @Override
- public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
- return addOrReplace(this.datasource, dataSource);
- }
-
- @Override
- public OpResult removeDataSource(String datasourceId) {
- return remove(datasource, datasourceId);
- }
-
- @Override
- public List<PolicyDefinition> listPolicies() {
- return list(policy, PolicyDefinition.class);
- }
-
- @Override
- public OpResult addPolicy(PolicyDefinition policy) {
- return addOrReplace(this.policy, policy);
- }
-
- @Override
- public OpResult removePolicy(String policyId) {
- return remove(policy, policyId);
- }
-
- @Override
- public List<Publishment> listPublishment() {
- return list(publishment, Publishment.class);
- }
-
- @Override
- public OpResult addPublishment(Publishment publishment) {
- return addOrReplace(this.publishment, publishment);
- }
-
- @Override
- public OpResult removePublishment(String pubId) {
- return remove(publishment, pubId);
- }
-
- @Override
- public List<PublishmentType> listPublishmentType() {
- return list(publishmentType, PublishmentType.class);
- }
-
- @Override
- public OpResult addPublishmentType(PublishmentType pubType) {
- return addOrReplace(this.publishmentType, pubType);
- }
-
- @Override
- public OpResult removePublishmentType(String pubType) {
- return remove(publishmentType, pubType);
- }
-
- @Override
- public List<AlertPublishEvent> listAlertPublishEvent(int size) {
- List<AlertPublishEvent> result = list(alerts, AlertPublishEvent.class);
- if (size < 0 || size > result.size()) {
- size = result.size();
- }
- return result.subList(result.size() - size, result.size());
- }
-
- @Override
- public AlertPublishEvent getAlertPublishEvent(String alertId) {
- List<AlertPublishEvent> results = list(alerts, AlertPublishEvent.class);
- Optional<AlertPublishEvent> op = results.stream().filter(alert -> alert.getAlertId().equals(alertId)).findAny();
- if (op.isPresent()) {
- return op.get();
- }
- return null;
- }
-
- @Override
- public List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size) {
- List<AlertPublishEvent> events = list(alerts, AlertPublishEvent.class);
- List<AlertPublishEvent> result = events.stream().filter(alert -> alert.getPolicyId().equals(policyId)).collect(Collectors.toList());
- if (size < 0 || size > result.size()) {
- size = result.size();
- }
- return events.subList(result.size() - size, result.size());
- }
-
- @Override
- public OpResult addAlertPublishEvent(AlertPublishEvent event) {
- return addOrReplace(alerts, event);
- }
-
- private <T> OpResult addOne(MongoCollection<Document> collection, T t) {
- OpResult result = new OpResult();
- String json = "";
- try {
- json = mapper.writeValueAsString(t);
- collection.insertOne(Document.parse(json));
- result.code = 200;
- result.message = String.format("add one document [%s] to collection [%s] succeed!", json, collection.getNamespace());
- LOG.info(result.message);
- } catch (Exception e) {
- result.code = 400;
- result.message = e.getMessage();
- LOG.error(String.format("Add one document [%s] to collection [%s] failed!", json, collection.getNamespace()), e);
- }
- return result;
- }
-
- /**
- * Due to some field name in SpoutSpec contains dot(.) which is invalid Mongo Field name, we need to transform the
- * format to store in Mongo.
- * @return opresult
- */
- private <T> OpResult addOneSpoutSpec(T t) {
- OpResult result = new OpResult();
- String json = "";
- try {
- json = mapper.writeValueAsString(t);
- Document doc = Document.parse(json);
-
- String [] metadataMapArrays = {"kafka2TupleMetadataMap", "tuple2StreamMetadataMap", "streamRepartitionMetadataMap"};
- for (String metadataMapName: metadataMapArrays) {
- Document _metadataMapDoc = (Document) doc.get(metadataMapName);
- doc.remove(metadataMapName);
-
- ArrayList<Document> _metadataMapArray = new ArrayList<>();
-
- for ( String key : _metadataMapDoc.keySet()) {
- Document _subDoc = new Document();
- _subDoc.put("topicName", key);
- _subDoc.put(metadataMapName, _metadataMapDoc.get(key));
- _metadataMapArray.add(_subDoc);
- }
- doc.append(metadataMapName, _metadataMapArray);
- }
-
- spoutSpecs.insertOne(doc);
- result.code = 200;
- result.message = String.format("add one document [%s] to collection [%s] succeed!", doc.toJson(), spoutSpecs.getNamespace());
- LOG.info(result.message);
- } catch (Exception e) {
- result.code = 400;
- result.message = e.getMessage();
- LOG.error(String.format("Add one document [%s] to collection [%s] failed!", json, spoutSpecs.getNamespace()), e);
- }
- return result;
- }
-
- @Override
- public ScheduleState getScheduleState(String versionId) {
- BsonDocument doc = new BsonDocument();
- doc.append("version", new BsonString(versionId));
- ScheduleState state = scheduleStates.find(doc).map(new Function<Document, ScheduleState>() {
- @Override
- public ScheduleState apply(Document t) {
- String json = t.toJson();
- try {
- return mapper.readValue(json, ScheduleState.class);
- } catch (IOException e) {
- LOG.error("deserialize config item failed!", e);
- }
- return null;
- }
- }).first();
-
- if (state != null) {
- // based on version, to add content from collections of spoutSpecs/alertSpecs/etc..
- state = addDetailForScheduleState(state, versionId);
- }
-
- return state;
- }
-
- /**
- * get the basic ScheduleState, and then based on the version to get all sub-part(spoutSpecs/alertSpecs/etc)
- * to form a completed ScheduleState.
- * @return the latest ScheduleState
- */
- @Override
- public ScheduleState getScheduleState() {
- BsonDocument sort = new BsonDocument();
- sort.append("generateTime", new BsonInt32(-1));
- ScheduleState state = scheduleStates.find().sort(sort).map(new Function<Document, ScheduleState>() {
- @Override
- public ScheduleState apply(Document t) {
- String json = t.toJson();
- try {
- return mapper.readValue(json, ScheduleState.class);
- } catch (IOException e) {
- LOG.error("deserialize config item failed!", e);
- }
- return null;
- }
- }).first();
-
- if (state != null) {
- String version = state.getVersion();
- // based on version, to add content from collections of spoutSpecs/alertSpecs/etc..
- state = addDetailForScheduleState(state, version);
- }
-
- return state;
- }
-
- @Override
- public List<ScheduleState> listScheduleStates() {
- throw new UnsupportedOperationException("listScheduleStates not support!");
- }
-
- @Override
- public OpResult clearScheduleState(int maxCapacity) {
- throw new UnsupportedOperationException("clearScheduleState not support!");
- }
-
- private ScheduleState addDetailForScheduleState(ScheduleState state, String version) {
- Map<String, SpoutSpec> spoutMaps = maps(spoutSpecs, SpoutSpec.class, version);
- if (spoutMaps.size() != 0) {
- state.setSpoutSpecs(spoutMaps);
- }
-
- Map<String, AlertBoltSpec> alertMaps = maps(alertSpecs, AlertBoltSpec.class, version);
- if (alertMaps.size() != 0) {
- state.setAlertSpecs(alertMaps);
- }
-
- Map<String, RouterSpec> groupMaps = maps(groupSpecs, RouterSpec.class, version);
- if (groupMaps.size() != 0) {
- state.setGroupSpecs(groupMaps);
- }
-
- Map<String, PublishSpec> publishMaps = maps(publishSpecs, PublishSpec.class, version);
- if (publishMaps.size() != 0) {
- state.setPublishSpecs(publishMaps);
- }
-
- List<VersionedPolicyDefinition> policyLists = list(policySnapshots, VersionedPolicyDefinition.class, version);
- if (policyLists.size() != 0) {
- state.setPolicySnapshots(policyLists);
- }
-
- List<VersionedStreamDefinition> streamLists = list(streamSnapshots, VersionedStreamDefinition.class, version);
- if (streamLists.size() != 0) {
- state.setStreamSnapshots(streamLists);
- }
-
- List<MonitoredStream> monitorLists = list(monitoredStreams, MonitoredStream.class, version);
- if (monitorLists.size() != 0) {
- state.setMonitoredStreams(monitorLists);
- }
-
- List<PolicyAssignment> assignmentLists = list(assignments, PolicyAssignment.class, version);
- if (assignmentLists.size() != 0) {
- state.setAssignments(assignmentLists);
- }
- return state;
- }
-
- private <T> Map<String, T> maps(MongoCollection<Document> collection, Class<T> clz, String version) {
- BsonDocument doc = new BsonDocument();
- doc.append("version", new BsonString(version));
-
- Map<String, T> maps = new HashMap<String, T>();
- String mapKey = (clz == SpoutSpec.class) ? "topologyId" : "topologyName";
- collection.find(doc).forEach(new Block<Document>() {
- @Override
- public void apply(Document document) {
- String json = document.toJson();
- try {
- //Due to some field name in SpoutSpec contains dot(.) which is invalid Mongo Field name,
- // we need to transform the format while reading from Mongo.
- if (clz == SpoutSpec.class) {
- Document doc = Document.parse(json);
- String [] metadataMapArrays = {"kafka2TupleMetadataMap", "tuple2StreamMetadataMap", "streamRepartitionMetadataMap"};
- for (String metadataMapName: metadataMapArrays) {
- ArrayList<Document> subDocs = (ArrayList) doc.get(metadataMapName);
- doc.remove(metadataMapName);
-
- Document replaceDoc = new Document();
- for ( Document subDoc : subDocs) {
- replaceDoc.put((String) subDoc.get("topicName"), subDoc.get(metadataMapName));
- }
- doc.put(metadataMapName, replaceDoc);
- }
-
- json = doc.toJson();
- }
- T t = mapper.readValue(json, clz);
- maps.put(document.getString(mapKey), t);
- } catch (IOException e) {
- LOG.error("deserialize config item failed!", e);
- }
- }
- });
-
- return maps;
- }
-
- private <T> List<T> list(MongoCollection<Document> collection, Class<T> clz, String version) {
- BsonDocument doc = new BsonDocument();
- doc.append("version", new BsonString(version));
-
- List<T> result = new LinkedList<T>();
- collection.find(doc).map(new Function<Document, T>() {
- @Override
- public T apply(Document t) {
- String json = t.toJson();
- try {
- return mapper.readValue(json, clz);
- } catch (IOException e) {
- LOG.error("deserialize config item failed!", e);
- }
- return null;
- }
- }).into(result);
- return result;
- }
-
- private <T> List<T> list(MongoCollection<Document> collection, Class<T> clz) {
- List<T> result = new LinkedList<T>();
- collection.find().map(new Function<Document, T>() {
- @Override
- public T apply(Document t) {
- String json = t.toJson();
- try {
- return mapper.readValue(json, clz);
- } catch (IOException e) {
- LOG.error("deserialize config item failed!", e);
- }
- return null;
- }
- }).into(result);
- return result;
- }
-
- /**
- * write ScheduleState to several collections. basic info writes to ScheduleState, other writes to collections
- * named by spoutSpecs/alertSpecs/etc.
- *
- * @param state
- * @return
- */
- @Override
- public OpResult addScheduleState(ScheduleState state) {
- OpResult result = new OpResult();
- try {
- for (String key : state.getSpoutSpecs().keySet()) {
- SpoutSpec spoutSpec = state.getSpoutSpecs().get(key);
- addOneSpoutSpec(spoutSpec);
- }
-
- for (String key : state.getAlertSpecs().keySet()) {
- AlertBoltSpec alertBoltSpec = state.getAlertSpecs().get(key);
- addOne(alertSpecs, alertBoltSpec);
- }
-
- for (String key : state.getGroupSpecs().keySet()) {
- RouterSpec groupSpec = state.getGroupSpecs().get(key);
- addOne(groupSpecs, groupSpec);
- }
-
- for (String key : state.getPublishSpecs().keySet()) {
- PublishSpec publishSpec = state.getPublishSpecs().get(key);
- addOne(publishSpecs, publishSpec);
- }
-
- for (VersionedPolicyDefinition policySnapshot : state.getPolicySnapshots()) {
- addOne(policySnapshots, policySnapshot);
- }
-
- for (VersionedStreamDefinition streamSnapshot : state.getStreamSnapshots()) {
- addOne(streamSnapshots, streamSnapshot);
- }
-
- for (MonitoredStream monitoredStream : state.getMonitoredStreams()) {
- addOne(monitoredStreams, monitoredStream);
- }
-
- for (PolicyAssignment assignment : state.getAssignments()) {
- addOne(assignments, assignment);
- }
-
- ScheduleStateBase stateBase = new ScheduleStateBase(
- state.getVersion(), state.getGenerateTime(), state.getCode(),
- state.getMessage(), state.getScheduleTimeMillis());
-
- addOne(scheduleStates, stateBase);
-
- result.code = 200;
- result.message = "add document to collection schedule_specs succeed";
- } catch (Exception e) {
- result.code = 400;
- result.message = e.getMessage();
- LOG.error("", e);
- }
- return result;
- }
-
- @Override
- public List<PolicyAssignment> listAssignments() {
- return list(assignments, PolicyAssignment.class);
- }
-
- @Override
- public OpResult addAssignment(PolicyAssignment assignment) {
- return addOne(assignments, assignment);
- }
-
- @Override
- public List<Topology> listTopologies() {
- return list(topologies, Topology.class);
- }
-
- @Override
- public OpResult addTopology(Topology t) {
- return addOrReplace(this.topologies, t);
- }
-
- @Override
- public OpResult removeTopology(String topologyName) {
- return remove(topologies, topologyName);
- }
-
- @Override
- public OpResult clear() {
- throw new UnsupportedOperationException("clear not support!");
- }
-
- @Override
- public Models export() {
- throw new UnsupportedOperationException("export not support!");
- }
-
- @Override
- public OpResult importModels(Models models) {
- throw new UnsupportedOperationException("importModels not support!");
- }
-
- @Override
- public void close() throws IOException {
- client.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java
deleted file mode 100644
index 2463e5b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.resource;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-/**
- * This models used for metadata export/import to easy of test.
- *
- * @since May 23, 2016
- */
-public class Models {
- public List<StreamingCluster> clusters = new ArrayList<StreamingCluster>();
- public List<StreamDefinition> schemas = new ArrayList<StreamDefinition>();
- public List<Kafka2TupleMetadata> datasources = new ArrayList<Kafka2TupleMetadata>();
- public List<PolicyDefinition> policies = new ArrayList<PolicyDefinition>();
- public List<Publishment> publishments = new ArrayList<Publishment>();
- public SortedMap<String, ScheduleState> scheduleStates = new TreeMap<String, ScheduleState>();
- public List<PolicyAssignment> assignments = new ArrayList<PolicyAssignment>();
- public List<Topology> topologies = new ArrayList<Topology>();
-}