You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/06/02 07:07:59 UTC
[20/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen
alert engine code on branch-0.5
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
deleted file mode 100644
index ccb4624..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.eagle.alert.config.ConfigBusProducer;
-import org.apache.eagle.alert.config.ConfigValue;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
-import org.apache.eagle.alert.coordinator.trigger.DynamicPolicyLoader;
-import org.apache.eagle.alert.coordinator.trigger.PolicyChangeListener;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * @since Mar 24, 2016 Coordinator is a standalone java application, which
- * listens to policy changes and use schedule algorithm to distribute
- * policies 1) reacting to shutdown events 2) start non-daemon thread to
- * pull policies and figure out if polices are changed
- */
-public class Coordinator {
- private static final String COORDINATOR = "coordinator";
- /**
- * {@link ZKMetadataChangeNotifyService}
- * /alert/{topologyName}/spout
- * /router
- * /alert
- * /publisher
- */
- private static final String ZK_ALERT_CONFIG_SPOUT = "{0}/spout";
- private static final String ZK_ALERT_CONFIG_ROUTER = "{0}/router";
- private static final String ZK_ALERT_CONFIG_ALERT = "{0}/alert";
- private static final String ZK_ALERT_CONFIG_PUBLISHER = "{0}/publisher";
-
- private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
-
- private final static String METADATA_SERVICE_HOST = "metadataService.host";
- private final static String METADATA_SERVICE_PORT = "metadataService.port";
- private final static String METADATA_SERVICE_CONTEXT = "metadataService.context";
- private final static String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis";
- private final static String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis";
-
- private volatile ScheduleState currentState = null;
- private final ConfigBusProducer producer;
- private final IMetadataServiceClient client;
- private Config config;
-
- public Coordinator() {
- config = ConfigFactory.load().getConfig(COORDINATOR);
- ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
- producer = new ConfigBusProducer(zkConfig);
- client = new MetadataServiceClientImpl(config);
- }
-
- public Coordinator(Config config, ConfigBusProducer producer, IMetadataServiceClient client) {
- this.config = config;
- this.producer = producer;
- this.client = client;
- }
-
- public ScheduleState schedule(ScheduleOption option) {
- IScheduleContext context = new ScheduleContextBuilder(client).buildContext();
- TopologyMgmtService mgmtService = new TopologyMgmtService();
- IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
-
- scheduler.init(context, mgmtService);
- ScheduleState state = scheduler.schedule(option);
-
- // persist & notify
- postSchedule(client, state, producer);
-
- currentState = state;
- return state;
- }
-
- public static void postSchedule(IMetadataServiceClient client, ScheduleState state, ConfigBusProducer producer) {
- // persist state
- client.addScheduleState(state);
- // TODO, see ScheduleState comments on how to better store these configs
- // store policy assignment
- // store monitored stream
-
- // notify
- ConfigValue value = new ConfigValue();
- value.setValue(state.getVersion());
- value.setValueVersionId(true);
- for (String topo : state.getSpoutSpecs().keySet()) {
- producer.send(MessageFormat.format(ZK_ALERT_CONFIG_SPOUT, topo), value);
- }
- for (String topo : state.getGroupSpecs().keySet()) {
- producer.send(MessageFormat.format(ZK_ALERT_CONFIG_ROUTER, topo), value);
- }
- for (String topo : state.getAlertSpecs().keySet()) {
- producer.send(MessageFormat.format(ZK_ALERT_CONFIG_ALERT, topo), value);
- }
- for (String topo : state.getPublishSpecs().keySet()) {
- producer.send(MessageFormat.format(ZK_ALERT_CONFIG_PUBLISHER, topo), value);
- }
- }
-
- public ScheduleState getState() {
- return currentState;
- }
-
- /**
- * shutdown background threads and release various resources
- */
- private static class CoordinatorShutdownHook implements Runnable {
- private static final Logger LOG = LoggerFactory.getLogger(CoordinatorShutdownHook.class);
- private ScheduledExecutorService executorSrv;
-
- public CoordinatorShutdownHook(ScheduledExecutorService executorSrv) {
- this.executorSrv = executorSrv;
- }
-
- @Override
- public void run() {
- LOG.info("start shutdown coordinator ...");
- LOG.info("Step 1 shutdown dynamic policy loader thread ");
- // we should catch every exception to make best effort for clean
- // shutdown
- try {
- executorSrv.shutdown();
- executorSrv.awaitTermination(2000, TimeUnit.MILLISECONDS);
- } catch (Throwable t) {
- LOG.error("error shutdown dynamic policy loader", t);
- } finally {
- executorSrv.shutdownNow();
- }
- }
- }
-
- private static class PolicyChangeHandler implements PolicyChangeListener {
- private final static Logger LOG = LoggerFactory.getLogger(PolicyChangeHandler.class);
- private Config config;
- private IMetadataServiceClient client;
-
- public PolicyChangeHandler(Config config) {
- this.config = config;
- this.client = new MetadataServiceClientImpl(config);
- }
-
- @Override
- public void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies,
- Collection<String> removedPolicies, Collection<String> modifiedPolicies) {
- LOG.info("policy changed ... ");
- LOG.info("allPolicies: " + allPolicies + ", addedPolicies: " + addedPolicies + ", removedPolicies: "
- + removedPolicies + ", modifiedPolicies: " + modifiedPolicies);
-
- IScheduleContext context = new ScheduleContextBuilder(client).buildContext();
- TopologyMgmtService mgmtService = new TopologyMgmtService();
- IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
-
- scheduler.init(context, mgmtService);
-
- ScheduleState state = scheduler.schedule(new ScheduleOption());
-
- ConfigBusProducer producer = new ConfigBusProducer(ZKConfigBuilder.getZKConfig(config));
- postSchedule(client, state, producer);
- producer.send("spout", new ConfigValue());
- }
- }
-
- public static void main(String[] args) throws Exception {
- Config config = ConfigFactory.load().getConfig(COORDINATOR);
- // build dynamic policy loader
- String host = config.getString(METADATA_SERVICE_HOST);
- int port = config.getInt(METADATA_SERVICE_PORT);
- String context = config.getString(METADATA_SERVICE_CONTEXT);
- IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context);
- DynamicPolicyLoader loader = new DynamicPolicyLoader(client);
- loader.addPolicyChangeListener(new PolicyChangeHandler(config));
-
- // schedule dynamic policy loader
- long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS);
- long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS);
- ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(1);
- scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
- Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv)));
- LOG.info("Eagle Coordinator started ...");
-
- Thread.currentThread().join();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
deleted file mode 100644
index 3458b3e..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-/**
- * @since Apr 22, 2016
- *
- */
-public class CoordinatorConstants {
- public static final String CONFIG_ITEM_COORDINATOR = "coordinator";
- public static final String CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND = "topologyLoadUpbound";
- public static final String CONFIG_ITEM_BOLT_LOAD_UPBOUND = "boltLoadUpbound";
- public static final String POLICY_DEFAULT_PARALLELISM = "policyDefaultParallelism";
- public static final String BOLT_PARALLELISM = "boltParallelism";
- public static final String NUM_OF_ALERT_BOLTS_PER_TOPOLOGY = "numOfAlertBoltsPerTopology";
- public static final String POLICIES_PER_BOLT = "policiesPerBolt";
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
deleted file mode 100644
index 5e61443..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-
-/**
- * @since Mar 24, 2016
- *
- */
-public interface IPolicyScheduler {
-
- void init(IScheduleContext context, TopologyMgmtService mgmtService);
-
- /**
- * Build the assignments for all.
- */
- ScheduleState schedule(ScheduleOption option);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
deleted file mode 100644
index 0cde22d..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * @since Mar 28, 2016
- *
- */
-public interface IScheduleContext {
-
- Map<String, Topology> getTopologies();
-
- Map<String, PolicyDefinition> getPolicies();
-
- // data source
- Map<String, Kafka2TupleMetadata> getDataSourceMetadata();
-
- Map<String, StreamDefinition> getStreamSchemas();
-
- Map<String, TopologyUsage> getTopologyUsages();
-
- Map<String, PolicyAssignment> getPolicyAssignments();
-
- Map<StreamGroup, MonitoredStream> getMonitoredStreams();
-
- Map<String, Publishment> getPublishments();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
deleted file mode 100644
index 8bccb53..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import org.apache.eagle.alert.coordinator.impl.GreedyPolicyScheduler;
-
-/**
- * @since Mar 24, 2016
- *
- */
-public class PolicySchedulerFactory {
-
- public static IPolicyScheduler createScheduler() {
- return new GreedyPolicyScheduler();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
deleted file mode 100644
index 6c04e61..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-/**
- * A runtime option for one schedule processing.
- *
- * Could used for configuration override.
- *
- * @since Apr 19, 2016
- *
- */
-public class ScheduleOption {
- private int policiesPerBolt;
- private int boltParallelism;
- private int policyDefaultParallelism;
- private double boltLoadUpbound;
- private double topoLoadUpbound;
-
- public int getPoliciesPerBolt() {
- return policiesPerBolt;
- }
-
- public void setPoliciesPerBolt(int policiesPerBolt) {
- this.policiesPerBolt = policiesPerBolt;
- }
-
- public int getBoltParallelism() {
- return boltParallelism;
- }
-
- public void setBoltParallelism(int boltParallelism) {
- this.boltParallelism = boltParallelism;
- }
-
- public int getPolicyDefaultParallelism() {
- return policyDefaultParallelism;
- }
-
- public void setPolicyDefaultParallelism(int policyDefaultParallelism) {
- this.policyDefaultParallelism = policyDefaultParallelism;
- }
-
- public double getBoltLoadUpbound() {
- return boltLoadUpbound;
- }
-
- public void setBoltLoadUpbound(double boltLoadUpbound) {
- this.boltLoadUpbound = boltLoadUpbound;
- }
-
- public double getTopoLoadUpbound() {
- return topoLoadUpbound;
- }
-
- public void setTopoLoadUpbound(double topoLoadUpbound) {
- this.topoLoadUpbound = topoLoadUpbound;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
deleted file mode 100644
index 4ae07f5..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_COORDINATOR;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.NUM_OF_ALERT_BOLTS_PER_TOPOLOGY;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * @since Mar 29, 2016
- *
- */
-public class TopologyMgmtService {
-
- public static class TopologyMeta {
- public String topologyId;
- public Topology topology;
- public TopologyUsage usage;
-
- public String clusterId;
- public String nimbusHost;
- public String nimbusPort;
-
- }
-
- public static class StormClusterMeta {
- public String clusterId;
- public String nimbusHost;
- public String nimbusPort;
- public String stormVersion;
- }
-
- @SuppressWarnings("unused")
- private int boltParallelism = 0;
- private int numberOfBoltsPerTopology = 0;
-
- public TopologyMgmtService() {
- Config config = ConfigFactory.load().getConfig(CONFIG_ITEM_COORDINATOR);
- boltParallelism = config.getInt(CoordinatorConstants.BOLT_PARALLELISM);
- numberOfBoltsPerTopology = config.getInt(NUM_OF_ALERT_BOLTS_PER_TOPOLOGY);
- }
-
- public int getNumberOfAlertBoltsInTopology() {
- return numberOfBoltsPerTopology;
- }
-
- /**
- * TODO: call topology mgmt API to create a topology
- *
- * @return
- */
- public TopologyMeta creatTopology() {
- // TODO
- throw new UnsupportedOperationException("not supported yet!");
- }
-
- public List<TopologyMeta> listTopologies() {
- // TODO
- return Collections.emptyList();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
deleted file mode 100644
index a9b6c00..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_BOLT_LOAD_UPBOUND;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_COORDINATOR;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.POLICIES_PER_BOLT;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.POLICY_DEFAULT_PARALLELISM;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.IPolicyScheduler;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.ScheduleOption;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * A simple greedy assigner. <br/>
- * A greedy assigner simply loop the policies, find the most suitable topology
- * to locate the policy first, then assign the topics to corresponding
- * spouts/group-by bolts.
- *
- * <br/>
- * For each given policy, the greedy steps are
- * <ul>
- * <li>1. Find the same topology that already serve the policy without exceed the load</li>
- * <li>2. Find the topology that already take the source traffic without exceed the load</li>
- * <li>3. Find the topology that available to place source topic without exceed the load</li>
- * <li>4. Create a new topology and locate the policy</li>
- * <li>Route table generated after all policies assigned</li>
- * <ul>
- * <br/>
- *
- * @since Mar 24, 2016
- *
- */
-public class GreedyPolicyScheduler implements IPolicyScheduler {
-
- private static final Logger LOG = LoggerFactory.getLogger(GreedyPolicyScheduler.class);
-
- private int policiesPerBolt;
- private int policyDefaultParallelism;
- private int initialQueueSize;
- private double boltLoadUpbound;
-
- // copied context for scheduling
- private IScheduleContext context;
-
- private TopologyMgmtService mgmtService;
-
- private ScheduleState state;
-
- public GreedyPolicyScheduler() {
- Config config = ConfigFactory.load().getConfig(CONFIG_ITEM_COORDINATOR);
- policiesPerBolt = config.getInt(POLICIES_PER_BOLT);
- policyDefaultParallelism = config.getInt(POLICY_DEFAULT_PARALLELISM);
- initialQueueSize = policyDefaultParallelism;
- boltLoadUpbound = config.getDouble(CONFIG_ITEM_BOLT_LOAD_UPBOUND);
- }
-
- public synchronized ScheduleState schedule(ScheduleOption option) {
- // FIXME: never re-assign right now: sticky mode
- // TODO: how to identify the over-heat nodes? not yet done #Scale of policies
- // Answer: Use configured policiesPerBolt and configured bolt load up-bound
- // FIXME: Here could be strategy to define the topology priorities
- List<WorkItem> workSets = findWorkingSets();
- /**
- * <pre>
- * <ul>
- * <li>how to support take multiple "dumped" topology that consuming the same input as one available sets?</li>
- * Answer: spout spec generated after policy assigned
- * <li>How to define the input traffic partition?</li>
- * Answer: input traffic might not be supported right now.
- * <li>How to support traffic partition between topology?</li>
- * Answer: two possible place: a global route table will be generated, those target not in current topology tuples will be dropped. This make the partition for tuple to alert
- * <li>How to support add topology on demand by evaluate the available topology bandwidth(need topology level load)?</li>
- * Answer: Use configured topology load up-bound, when topology load is available, will adopt
- * <ul>
- * <pre>
- */
- List<ScheduleResult> results = new ArrayList<ScheduleResult>();
- Map<String, PolicyAssignment> newAssignments = new HashMap<String, PolicyAssignment>();
- for (WorkItem item : workSets) {
- ScheduleResult r = schedulePolicy(item, newAssignments);
- results.add(r);
- }
-
- state = generateMonitorMetadata(workSets, newAssignments);
- if (LOG.isDebugEnabled()) {
- LOG.debug("calculated schedule state: {}", JsonUtils.writeValueAsString(state));
- }
- return state;
- }
-
- private List<WorkItem> findWorkingSets() {
- // find the unassigned definition
- List<WorkItem> workSets = new LinkedList<WorkItem>();
- for (PolicyDefinition def : context.getPolicies().values()) {
- int expectParal = def.getParallelismHint();
- if (expectParal == 0) {
- expectParal = policyDefaultParallelism;
- }
- // how to handle expand of an policy in a smooth transition manner
- PolicyAssignment assignment = context.getPolicyAssignments().get(def.getName());
- if (assignment != null) {
- LOG.info("policy {} already allocated", def.getName());
- continue;
- }
-
- WorkItem item = new WorkItem(def, expectParal);
- workSets.add(item);
- }
- LOG.info("work set calculation: {}", workSets);
- return workSets;
- }
-
- private ScheduleState generateMonitorMetadata(List<WorkItem> expandworkSets,
- Map<String, PolicyAssignment> newAssignments) {
- MonitorMetadataGenerator generator = new MonitorMetadataGenerator(context);
- return generator.generate(expandworkSets);
- }
-
- private void placePolicy(PolicyDefinition def, AlertBoltUsage alertBoltUsage, Topology targetTopology,
- TopologyUsage usage) {
- String policyName = def.getName();
-
- // topology usage update
- alertBoltUsage.addPolicies(def);
-
- // update alert policy
- usage.getPolicies().add(policyName);
-
- // update source topics
- updateDataSource(usage, def);
-
- // update group-by
- updateGrouping(usage, def);
- }
-
- private void updateGrouping(TopologyUsage usage, PolicyDefinition def) {
- // groupByMeta is removed since groupspec generate doesnt need it now.
-// List<StreamPartition> policyPartitionSpec = def.getPartitionSpec();
-// Map<String, List<StreamPartition>> groupByMeta = usage.getGroupByMeta();
-// for (StreamPartition par : policyPartitionSpec) {
-// List<StreamPartition> partitions = groupByMeta.get(par.getStreamId());
-// if (partitions == null) {
-// partitions = new ArrayList<StreamPartition>();
-// // de-dup of the partition on the list?
-// groupByMeta.put(par.getStreamId(), partitions);
-// }
-// if (!partitions.contains(par)) {
-// partitions.add(par);
-// }
-// }
- }
-
- private void updateDataSource(TopologyUsage usage, PolicyDefinition def) {
- List<String> datasources = findDatasource(def);
- usage.getDataSources().addAll(datasources);
- }
-
- private List<String> findDatasource(PolicyDefinition def) {
- List<String> result = new ArrayList<String>();
-
- List<String> inputStreams = def.getInputStreams();
- Map<String, StreamDefinition> schemaMaps = context.getStreamSchemas();
- for (String is : inputStreams) {
- StreamDefinition ss = schemaMaps.get(is);
- result.add(ss.getDataSource());
- }
- return result;
- }
-
- /**
- * For each given policy, the greedy steps are
- * <ul>
- * <li>1. Find the same topology that already serve the policy</li>
- * <li>2. Find the topology that already take the source traffic</li>
- * <li>3. Find the topology that available to place source topic</li>
- * <li>4. Create a new topology and locate the policy</li>
- * <li>Route table generated after all policies assigned</li>
- * <ul>
- * <br/>
- *
- * @param newAssignments
- */
- private ScheduleResult schedulePolicy(WorkItem item, Map<String, PolicyAssignment> newAssignments) {
- LOG.info(" schedule for {}", item );
-
- String policyName = item.def.getName();
- StreamGroup policyStreamPartition = new StreamGroup();
- if (item.def.getPartitionSpec().isEmpty()) {
- LOG.error(" policy {} partition spec is empty! ", policyName);
- ScheduleResult result = new ScheduleResult();
- result.policyName = policyName;
- result.code = 400;
- result.message = "policy doesn't have partition spec";
- return result;
- }
- policyStreamPartition.addStreamPartitions(item.def.getPartitionSpec());
-
- MonitoredStream targetdStream = context.getMonitoredStreams().get(policyStreamPartition);
- if (targetdStream == null) {
- targetdStream = new MonitoredStream(policyStreamPartition);
- context.getMonitoredStreams().put(policyStreamPartition, targetdStream);
- }
-
- ScheduleResult result = new ScheduleResult();
- result.policyName = policyName;
-
- StreamWorkSlotQueue queue = findWorkSlotQueue(targetdStream, item.def);
- if (queue == null) {
- result.code = 400;
- result.message = String.format("unable to allocate work queue resource for policy %s !", policyName);
- } else {
- placePolicyToQueue(item.def, queue, newAssignments);
- result.code = 200;
- result.message = "OK";
- }
-
- LOG.info(" schedule result : {}", result);
- return result;
- }
-
- private void placePolicyToQueue(PolicyDefinition def, StreamWorkSlotQueue queue,
- Map<String, PolicyAssignment> newAssignments) {
- for (WorkSlot slot : queue.getWorkingSlots()) {
- Topology targetTopology = context.getTopologies().get(slot.getTopologyName());
- TopologyUsage usage = context.getTopologyUsages().get(slot.getTopologyName());
- AlertBoltUsage alertBoltUsage = usage.getAlertBoltUsage(slot.getBoltId());
- placePolicy(def, alertBoltUsage, targetTopology, usage);
- }
-// queue.placePolicy(def);
- PolicyAssignment assignment = new PolicyAssignment(def.getName(), queue.getQueueId());
- context.getPolicyAssignments().put(def.getName(), assignment);
- newAssignments.put(def.getName(), assignment);
- }
-
- private StreamWorkSlotQueue findWorkSlotQueue(MonitoredStream targetdStream, PolicyDefinition def) {
- StreamWorkSlotQueue targetQueue = null;
- for (StreamWorkSlotQueue queue : targetdStream.getQueues()) {
- if (isQueueAvailable(queue, def)) {
- targetQueue = queue;
- break;
- }
- }
-
- if (targetQueue == null) {
- WorkQueueBuilder builder = new WorkQueueBuilder(context, mgmtService);
- // TODO : get the properties from policy definiton
- targetQueue = builder.createQueue(targetdStream, false, getQueueSize(def.getParallelismHint()),
- new HashMap<String, Object>());
- }
- return targetQueue;
- }
-
- /**
- * Some strategy to generate correct size in Startegy of queue builder
- *
- * @param hint
- * @return
- */
- private int getQueueSize(int hint) {
- return initialQueueSize;
- }
-
- private boolean isQueueAvailable(StreamWorkSlotQueue queue, PolicyDefinition def) {
- if (queue.getQueueSize() < def.getParallelismHint()) {
- return false;
- }
-
- for (WorkSlot slot : queue.getWorkingSlots()) {
- TopologyUsage u = context.getTopologyUsages().get(slot.getTopologyName());
- AlertBoltUsage usage = u.getAlertBoltUsage(slot.getBoltId());
- if (!isBoltAvailable(usage, def)) {
- return false;
- }
- }
- return true;
- }
-
- private boolean isBoltAvailable(AlertBoltUsage boltUsage, PolicyDefinition def) {
- // overload or over policy # or already contains
- if (boltUsage == null || boltUsage.getLoad() > boltLoadUpbound
- || boltUsage.getPolicies().size() > policiesPerBolt || boltUsage.getPolicies().contains(def.getName())) {
- return false;
- }
- return true;
- }
-
- public void init(IScheduleContext context, TopologyMgmtService mgmtService) {
- this.context = new InMemScheduleConext(context);
- this.mgmtService = mgmtService;
- }
-
- public IScheduleContext getContext() {
- return context;
- }
-
- public ScheduleState getState() {
- return state;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
deleted file mode 100644
index 40f16e9..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @since Apr 26, 2016
- * Given current policy placement, figure out monitor metadata
- *
- * TODO: refactor to eliminate the duplicate of stupid if-notInMap-then-create....
- * FIXME: too many duplicated code logic : check null; add list to map; add to list..
- */
-public class MonitorMetadataGenerator {
-
- private static final Logger LOG = LoggerFactory.getLogger(MonitorMetadataGenerator.class);
-
- private IScheduleContext context;
-
- public MonitorMetadataGenerator(IScheduleContext context) {
- this.context = context;
- }
-
- public ScheduleState generate(List<WorkItem> expandworkSets) {
- // topologyId -> SpoutSpec
- Map<String, SpoutSpec> topoSpoutSpecsMap = generateSpoutMonitorMetadata();
-
- // grp-by meta spec(sort & grp)
- Map<String, RouterSpec> groupSpecsMap = generateGroupbyMonitorMetadata();
-
- // alert bolt spec
- Map<String, AlertBoltSpec> alertSpecsMap = generateAlertMonitorMetadata();
-
- Map<String, PublishSpec> publishSpecsMap = generatePublishMetadata();
-
- String uniqueVersion = generateVersion();
- ScheduleState status = new ScheduleState(uniqueVersion,
- topoSpoutSpecsMap,
- groupSpecsMap,
- alertSpecsMap,
- publishSpecsMap,
- context.getPolicyAssignments().values(),
- context.getMonitoredStreams().values(),
- context.getPolicies().values(),
- context.getStreamSchemas().values());
- return status;
- }
-
- private Map<String, PublishSpec> generatePublishMetadata() {
- Map<String, PublishSpec> pubSpecs = new HashMap<String, PublishSpec>();
- // prebuild policy to publishment map
- Map<String, List<Publishment>> policyToPub = new HashMap<String, List<Publishment>>();
- for (Publishment pub : context.getPublishments().values()) {
- for (String policyId : pub.getPolicyIds()) {
- List<Publishment> policyPubs = policyToPub.get(policyId);
- if (policyPubs == null) {
- policyPubs = new ArrayList<>();
- policyToPub.put(policyId, policyPubs);
- }
- policyPubs.add(pub);
- }
- }
-
- // build per topology
- for (TopologyUsage u : context.getTopologyUsages().values()) {
- PublishSpec pubSpec = pubSpecs.get(u.getTopoName());
- if (pubSpec == null) {
- pubSpec = new PublishSpec(u.getTopoName(), context.getTopologies().get(u.getTopoName()).getPubBoltId());
- pubSpecs.put(u.getTopoName(), pubSpec);
- }
-
- for (String p : u.getPolicies()) {
- PolicyDefinition definition = context.getPolicies().get(p);
- if (definition == null) {
- continue;
- }
- if (policyToPub.containsKey(p)) {
- for (Publishment pub : policyToPub.get(p)) {
- pubSpec.addPublishment(pub);
- }
- }
- }
- }
- return pubSpecs;
- }
-
- /**
- * FIXME: add auto-increment version number?
- */
- private String generateVersion() {
- return "spec_version_" + System.currentTimeMillis();
- }
-
- private Map<String, AlertBoltSpec> generateAlertMonitorMetadata() {
- Map<String, AlertBoltSpec> alertSpecs = new HashMap<String, AlertBoltSpec>();
- for (TopologyUsage u : context.getTopologyUsages().values()) {
- AlertBoltSpec alertSpec = alertSpecs.get(u.getTopoName());
- if (alertSpec == null) {
- alertSpec = new AlertBoltSpec(u.getTopoName());
- alertSpecs.put(u.getTopoName(), alertSpec);
- }
- for (AlertBoltUsage boltUsage : u.getAlertUsages().values()) {
- for (String policyName : boltUsage.getPolicies()) {
- PolicyDefinition definition = context.getPolicies().get(policyName);
- alertSpec.addBoltPolicy(boltUsage.getBoltId(), definition.getName());
- }
- }
- }
- return alertSpecs;
- }
-
- private Map<String, RouterSpec> generateGroupbyMonitorMetadata() {
- Map<String, RouterSpec> groupSpecsMap = new HashMap<String, RouterSpec>();
- for (TopologyUsage u : context.getTopologyUsages().values()) {
- RouterSpec spec = groupSpecsMap.get(u.getTopoName());
- if (spec == null) {
- spec = new RouterSpec(u.getTopoName());
- groupSpecsMap.put(u.getTopoName(), spec);
- }
-
- for (MonitoredStream ms : u.getMonitoredStream()) {
- // mutiple stream on the same policy group : for correlation group case:
- for (StreamPartition partiton : ms.getStreamGroup().getStreamPartitions()) {
- StreamRouterSpec routeSpec = new StreamRouterSpec();
- routeSpec.setPartition(partiton);
- routeSpec.setStreamId(partiton.getStreamId());
-
- for (StreamWorkSlotQueue sq : ms.getQueues()) {
- PolicyWorkerQueue queue = new PolicyWorkerQueue();
- queue.setWorkers(sq.getWorkingSlots());
- queue.setPartition(partiton);
- routeSpec.addQueue(queue);
- }
-
- spec.addRouterSpec(routeSpec);
- }
- }
- }
-
- return groupSpecsMap;
- }
-
- private Map<String, SpoutSpec> generateSpoutMonitorMetadata() {
- Map<String, StreamWorkSlotQueue> queueMap = buildQueueMap();
-
- Map<String, SpoutSpec> topoSpoutSpecsMap = new HashMap<String, SpoutSpec>();
- // streamName -> StreamDefinition
- Map<String, StreamDefinition> streamSchemaMap = context.getStreamSchemas();
- Map<String, Kafka2TupleMetadata> datasourcesMap = context.getDataSourceMetadata();
- for (TopologyUsage usage : context.getTopologyUsages().values()) {
- Topology topo = context.getTopologies().get(usage.getTopoName());
-
- // based on data source schemas
- // generate topic -> Kafka2TupleMetadata
- // generate topic -> Tuple2StreamMetadata (actually the schema selector)
- Map<String, Kafka2TupleMetadata> dss = new HashMap<String, Kafka2TupleMetadata>();
- Map<String, Tuple2StreamMetadata> tss = new HashMap<String, Tuple2StreamMetadata>();
- for (String dataSourceId : usage.getDataSources()) {
- Kafka2TupleMetadata ds = datasourcesMap.get(dataSourceId);
- dss.put(ds.getTopic(), ds);
- tss.put(ds.getTopic(), ds.getCodec());
- }
-
- // generate topicId -> StreamRepartitionMetadata
- Map<String, List<StreamRepartitionMetadata>> streamsMap = new HashMap<String, List<StreamRepartitionMetadata>>();
- for (String policyName : usage.getPolicies()) {
- PolicyDefinition def = context.getPolicies().get(policyName);
-
- PolicyAssignment assignment = context.getPolicyAssignments().get(policyName);
- if (assignment == null) {
- LOG.error(" can not find assignment for policy {} ! ", policyName);
- continue;
- }
-
- for (StreamPartition policyStreamPartition : def.getPartitionSpec()) {
- String stream = policyStreamPartition.getStreamId();
- StreamDefinition schema = streamSchemaMap.get(stream);
- String topic = datasourcesMap.get(schema.getDataSource()).getTopic();
-
- // add stream name to tuple metadata
- if (tss.containsKey(topic)) {
- Tuple2StreamMetadata tupleMetadata = tss.get(topic);
- tupleMetadata.getActiveStreamNames().add(stream);
- }
-
- // grouping strategy
- StreamRepartitionStrategy gs = new StreamRepartitionStrategy();
- gs.partition = policyStreamPartition;
- gs.numTotalParticipatingRouterBolts = queueMap.get(assignment.getQueueId()).getNumberOfGroupBolts();
- gs.startSequence = queueMap.get(assignment.getQueueId()).getTopologyGroupStartIndex(topo.getName());
- gs.totalTargetBoltIds = new ArrayList<String>(topo.getGroupNodeIds());
-
- // add to map
- addGroupingStrategy(streamsMap, stream, schema, topic, schema.getDataSource(), gs);
- }
- }
-
- SpoutSpec spoutSpec = new SpoutSpec(topo.getName(), streamsMap, tss, dss);
- topoSpoutSpecsMap.put(topo.getName(), spoutSpec);
- }
- return topoSpoutSpecsMap;
- }
-
- /**
- * Work queue not a root level object, thus we need to build a map from
- * MonitoredStream for later quick lookup
- *
- * @return
- */
- private Map<String, StreamWorkSlotQueue> buildQueueMap() {
- Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
- for (MonitoredStream ms : context.getMonitoredStreams().values()) {
- for (StreamWorkSlotQueue queue : ms.getQueues()) {
- queueMap.put(queue.getQueueId(), queue);
- }
- }
- return queueMap;
- }
-
- private void addGroupingStrategy(Map<String, List<StreamRepartitionMetadata>> streamsMap, String stream,
- StreamDefinition schema, String topicName, String datasourceName, StreamRepartitionStrategy gs) {
- List<StreamRepartitionMetadata> dsStreamMeta;
- if (streamsMap.containsKey(topicName)) {
- dsStreamMeta = streamsMap.get(topicName);
- } else {
- dsStreamMeta = new ArrayList<StreamRepartitionMetadata>();
- streamsMap.put(topicName, dsStreamMeta);
- }
- StreamRepartitionMetadata targetSm = null;
- for (StreamRepartitionMetadata sm : dsStreamMeta) {
- if (stream.equalsIgnoreCase(sm.getStreamId())) {
- targetSm = sm;
- break;
- }
- }
- if (targetSm == null) {
- targetSm = new StreamRepartitionMetadata(datasourceName, schema.getStreamId());
- dsStreamMeta.add(targetSm);
- }
- if (!targetSm.groupingStrategies.contains(gs)) {
- targetSm.addGroupStrategy(gs);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
deleted file mode 100644
index ea96d79..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import java.util.List;
-
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
-/**
- * Schedule result for one policy
- *
- *
- * @since Apr 26, 2016
- *
- */
-public class ScheduleResult {
- int code;
- String message;
- String policyName;
- StreamPartition partition;
- int index;
- List<PolicyAssignment> topoliciesScheduled;
-
- public String toString() {
- return String.format("policy: %s, result code: %d ", policyName, code, message);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
deleted file mode 100644
index baa489d..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-public class WorkItem {
- public final PolicyDefinition def;
- public final int requestParallelism;
-
- public WorkItem(PolicyDefinition def, int workNum) {
- this.def = def;
- this.requestParallelism = workNum;
- }
-
- public String toString() {
- return "policy name: " + def.getName() + "(" + requestParallelism + ")";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
deleted file mode 100644
index a32b8fb..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService;
-import org.apache.eagle.alert.coordinator.impl.strategies.IWorkSlotStrategy;
-import org.apache.eagle.alert.coordinator.impl.strategies.SameTopologySlotStrategy;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @since Apr 27, 2016
- *
- */
-public class WorkQueueBuilder {
-
- private static final Logger LOG = LoggerFactory.getLogger(WorkQueueBuilder.class);
-
- private final IScheduleContext context;
- private final TopologyMgmtService mgmtService;
-
- public WorkQueueBuilder(IScheduleContext context, TopologyMgmtService mgmtService) {
- this.context = context;
- this.mgmtService = mgmtService;
- }
-
- public StreamWorkSlotQueue createQueue(MonitoredStream stream, boolean isDedicated, int size,
- Map<String, Object> properties) {
- // FIXME: make extensible and configurable
- IWorkSlotStrategy strategy = new SameTopologySlotStrategy(context, stream.getStreamGroup(), mgmtService);
- List<WorkSlot> slots = strategy.reserveWorkSlots(size, isDedicated, properties);
- if (slots.size() < size) {
- LOG.error("allocat stream work queue failed, required size");
- return null;
- }
- StreamWorkSlotQueue queue = new StreamWorkSlotQueue(stream.getStreamGroup(), isDedicated, properties,
- slots);
- calculateGroupIndexAndCount(queue);
- assignQueueSlots(stream, queue);// build reverse reference
- stream.addQueues(queue);
-
- return queue;
- }
-
- private void assignQueueSlots(MonitoredStream stream, StreamWorkSlotQueue queue) {
- for (WorkSlot slot : queue.getWorkingSlots()) {
- TopologyUsage u = context.getTopologyUsages().get(slot.getTopologyName());
- AlertBoltUsage boltUsage = u.getAlertBoltUsage(slot.getBoltId());
- boltUsage.addQueue(stream.getStreamGroup(), queue);
- u.addMonitoredStream(stream);
- }
- }
-
- private void calculateGroupIndexAndCount(StreamWorkSlotQueue queue) {
- Map<String, Integer> result = new HashMap<String, Integer>();
- int total = 0;
- for (WorkSlot slot : queue.getWorkingSlots()) {
- if (result.containsKey(slot.getTopologyName())) {
- continue;
- }
- result.put(slot.getTopologyName(), total);
- total += context.getTopologies().get(slot.getTopologyName()).getNumOfGroupBolt();
- }
-
- queue.setNumberOfGroupBolts(total);
- queue.setTopoGroupStartIndex(result);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
deleted file mode 100644
index 28df3c4..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl.strategies;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-
-/**
- * @since Apr 27, 2016
- *
- */
-public interface IWorkSlotStrategy {
-
- List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
deleted file mode 100644
index e755237..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl.strategies;
-
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.CoordinatorConstants;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService.TopologyMeta;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * A simple strategy that only find the bolts in the same topology as the
- * required work slots.
- *
- * Invariant:<br/>
- * One slot queue only on the one topology.<br/>
- * One topology doesn't contains two same partition slot queues.
- *
- * @since Apr 27, 2016
- *
- */
-public class SameTopologySlotStrategy implements IWorkSlotStrategy {
-
- private static final Logger LOG = LoggerFactory.getLogger(SameTopologySlotStrategy.class);
-
- private final IScheduleContext context;
- private final StreamGroup partitionGroup;
- private final TopologyMgmtService mgmtService;
-
-// private final int numOfPoliciesBoundPerBolt;
- private final double topoLoadUpbound;
-
- public SameTopologySlotStrategy(IScheduleContext context, StreamGroup streamPartitionGroup,
- TopologyMgmtService mgmtService) {
- this.context = context;
- this.partitionGroup = streamPartitionGroup;
- this.mgmtService = mgmtService;
-
- Config config = ConfigFactory.load().getConfig(CoordinatorConstants.CONFIG_ITEM_COORDINATOR);
-// numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT);
- topoLoadUpbound = config.getDouble(CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND);
- }
-
- /**
- * @param isDedicated
- * - not used yet!
- */
- @Override
- public List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties) {
- Iterator<Topology> it = context.getTopologies().values().stream().filter((t) -> t.getNumOfAlertBolt() >= size)
- .iterator();
- // priority strategy first???
- List<WorkSlot> slots = new ArrayList<WorkSlot>();
- while (it.hasNext()) {
- Topology t = it.next();
- if (getQueueOnTopology(size, slots, t)) {
- break;
- }
- }
-
- if (slots.size() == 0) {
- int supportedSize = mgmtService.getNumberOfAlertBoltsInTopology();
- if (size > supportedSize) {
- LOG.error("can not find available slots for queue, required size {}, supported size {} !", size, supportedSize);
- return Collections.emptyList();
- }
- TopologyMeta topoMeta = mgmtService.creatTopology();
- if (topoMeta == null) {
- LOG.error("can not create topology for given queue requirement, required size {}, requried partition group {} !", size, partitionGroup);
- return Collections.emptyList();
- }
-
- context.getTopologies().put(topoMeta.topologyId, topoMeta.topology);
- context.getTopologyUsages().put(topoMeta.topologyId, topoMeta.usage);
- boolean placed = getQueueOnTopology(size, slots, topoMeta.topology);
- if (!placed) {
- LOG.error("can not find available slots from new created topology, required size {}. This indicates an error !", size);
- }
- }
- return slots;
- }
-
- private boolean getQueueOnTopology(int size, List<WorkSlot> slots, Topology t) {
- TopologyUsage u = context.getTopologyUsages().get(t.getName());
- if (!isTopologyAvailable(u)) {
- return false;
- }
-
- List<String> bolts = new ArrayList<String>();
- for (AlertBoltUsage alertUsage : u.getAlertUsages().values()) {
- if (isBoltAvailable(alertUsage)) {
- bolts.add(alertUsage.getBoltId());
- }
-
- if (bolts.size() == size) {
- break;
- }
- }
-
- if (bolts.size() == size) {
- for (String boltId : bolts) {
- WorkSlot slot = new WorkSlot(t.getName(), boltId);
- slots.add(slot);
- }
- return true;
- }
- return false;
- }
-
- private boolean isTopologyAvailable(TopologyUsage u) {
-// for (MonitoredStream stream : u.getMonitoredStream()) {
-// if (partition.equals(stream.getStreamParitition())) {
-// return false;
-// }
-// }
- if (u == null || u.getLoad() > topoLoadUpbound) {
- return false;
- }
-
- return true;
- }
-
- private boolean isBoltAvailable(AlertBoltUsage alertUsage) {
- // FIXME : more detail to compare on queue exclusion check
- if (alertUsage.getQueueSize() > 0) {
- return false;
- }
- // actually it's now 0;
- return true;
-// return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
deleted file mode 100644
index e9148f5..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.model;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-/**
- * @since Mar 28, 2016
- *
- */
-public class AlertBoltUsage {
-
- private String boltId;
- private List<String> policies = new ArrayList<String>();
- // the stream partitions group that scheduled for this given alert bolt
- private List<StreamGroup> partitions = new ArrayList<StreamGroup>();
- // the slot queue that scheduled for this given aler bolt
- private List<StreamWorkSlotQueue> referQueues = new ArrayList<StreamWorkSlotQueue>();
- private double load;
-
- public AlertBoltUsage(String anid) {
- this.boltId = anid;
- }
-
- public String getBoltId() {
- return boltId;
- }
-
- public void setBoltId(String boltId) {
- this.boltId = boltId;
- }
-
- public List<String> getPolicies() {
- return policies;
- }
-
- public void addPolicies(PolicyDefinition pd) {
- policies.add(pd.getName());
- // add first partition
-// for (StreamPartition par : pd.getPartitionSpec()) {
-// partitions.add(par);
-// }
- }
-
- public double getLoad() {
- return load;
- }
-
- public void setLoad(double load) {
- this.load = load;
- }
-
- public List<StreamGroup> getPartitions() {
- return partitions;
- }
-
- public List<StreamWorkSlotQueue> getReferQueues() {
- return referQueues;
- }
-
- public int getQueueSize() {
- return referQueues.size();
- }
-
- public void addQueue(StreamGroup streamPartition, StreamWorkSlotQueue queue) {
- this.referQueues.add(queue);
- this.partitions.add(streamPartition);
- }
-
- public void removeQueue(StreamWorkSlotQueue queue) {
- this.referQueues.remove(queue);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
deleted file mode 100644
index 86238d1..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.model;
-
-
-/**
- * @since Mar 28, 2016
- *
- */
-public class GroupBoltUsage {
-
- private String boltId;
- private double load;
-
- public GroupBoltUsage(String boltId) {
- this.boltId = boltId;
- }
-
-// private final Set<String> streams = new HashSet<String>();
-// private final Map<String, StreamFilter> filters = new HashMap<String, StreamFilter>();
-
-// private final Map<String, List<StreamPartition>> groupByMeta;
-
- public double getLoad() {
- return load;
- }
-
- public void setLoad(double load) {
- this.load = load;
- }
-
-// public Set<String> getStreams() {
-// return streams;
-// }
-//
-//
-// public Map<String, StreamFilter> getFilters() {
-// return filters;
-// }
-
-// public Map<String, List<StreamPartition>> getGroupByMeta() {
-// return groupByMeta;
-// }
-
- public String getBoltId() {
- return boltId;
- }
-
- public void setBoltId(String boltId) {
- this.boltId = boltId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
deleted file mode 100644
index 6eb6195..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.model;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-
-/**
- * @since Mar 27, 2016
- *
- */
-public class TopologyUsage {
- // topo info
- private String topoName;
- private final Set<String> datasources = new HashSet<String>();
- // usage information
- private final Set<String> policies = new HashSet<String>();
- private final Map<String, AlertBoltUsage> alertUsages = new HashMap<String, AlertBoltUsage>();
- private final Map<String, GroupBoltUsage> groupUsages = new HashMap<String, GroupBoltUsage>();
- private final List<MonitoredStream> monitoredStream = new ArrayList<MonitoredStream>();
-
- private double load;
-
- /**
- * This is to be the existing/previous meta-data. <br/>
- * Only one group meta-data for all of the group bolts in this topology.
- */
-
- public TopologyUsage() {
- }
-
- public TopologyUsage(String name) {
- this.topoName = name;
- }
-
- public String getTopoName() {
- return topoName;
- }
-
- public void setTopoName(String topoId) {
- this.topoName = topoId;
- }
-
- public Set<String> getDataSources() {
- return datasources;
- }
-
- public Set<String> getPolicies() {
- return policies;
- }
-
- public Map<String, AlertBoltUsage> getAlertUsages() {
- return alertUsages;
- }
-
- public AlertBoltUsage getAlertBoltUsage(String boltId) {
- return alertUsages.get(boltId);
- }
-
- public Map<String, GroupBoltUsage> getGroupUsages() {
- return groupUsages;
- }
-
- public List<MonitoredStream> getMonitoredStream() {
- return monitoredStream;
- }
-
- public void addMonitoredStream(MonitoredStream par) {
- if (!this.monitoredStream.contains(par)) {
- this.monitoredStream.add(par);
- }
- }
-
- public double getLoad() {
- return load;
- }
-
- public void setLoad(double load) {
- this.load = load;
- }
-
-}