You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/08 07:14:16 UTC
[15/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle
problems on eagle-alert module and enable failOnViolation
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties
index fb13ad5..ba06033 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties
@@ -12,9 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
log4j.rootLogger=DEBUG, stdout
-
# standard output
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
index 205905b..5a78b6a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
@@ -1,42 +1,45 @@
{
- "streamId": "perfmon_cpu_stream",
- "dataSource" : "perfmon_datasource",
- "description":"the data stream for perfmon cpu metrics",
- "validate": false,
- "timeseries":false,
- "columns": [
- {
- "name": "host",
- "type" : "string",
- "defaultValue": "",
- "required":true
- },
- {
- "name": "timestamp",
- "type" : "long",
- "defaultValue": 0,
- "required":true
- },{
- "name": "floatField",
- "type" : "float",
- "defaultValue": "1.2",
- "required": true
- },{
- "name": "intField",
- "type" : "int",
- "defaultValue": "3",
- "required":true
- },{
- "name": "value",
- "type" : "double",
- "defaultValue": 0.0,
- "required":true
- },
- {
- "name": "boolField",
- "type" : "bool",
- "defaultValue": true,
- "required":true
- }
- ]
+ "streamId": "perfmon_cpu_stream",
+ "dataSource": "perfmon_datasource",
+ "description": "the data stream for perfmon cpu metrics",
+ "validate": false,
+ "timeseries": false,
+ "columns": [
+ {
+ "name": "host",
+ "type": "string",
+ "defaultValue": "",
+ "required": true
+ },
+ {
+ "name": "timestamp",
+ "type": "long",
+ "defaultValue": 0,
+ "required": true
+ },
+ {
+ "name": "floatField",
+ "type": "float",
+ "defaultValue": "1.2",
+ "required": true
+ },
+ {
+ "name": "intField",
+ "type": "int",
+ "defaultValue": "3",
+ "required": true
+ },
+ {
+ "name": "value",
+ "type": "double",
+ "defaultValue": 0.0,
+ "required": true
+ },
+ {
+ "name": "boolField",
+ "type": "bool",
+ "defaultValue": true,
+ "required": true
+ }
+ ]
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
index bd06919..4589e63 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
@@ -50,16 +50,16 @@
<artifactId>jersey-multipart</artifactId>
</dependency>
<!--<dependency>-->
- <!--<groupId>org.codehaus.jackson</groupId>-->
- <!--<artifactId>jackson-mapper-asl</artifactId>-->
+ <!--<groupId>org.codehaus.jackson</groupId>-->
+ <!--<artifactId>jackson-mapper-asl</artifactId>-->
<!--</dependency>-->
<!--<dependency>-->
- <!--<groupId>org.codehaus.jackson</groupId>-->
- <!--<artifactId>jackson-jaxrs</artifactId>-->
+ <!--<groupId>org.codehaus.jackson</groupId>-->
+ <!--<artifactId>jackson-jaxrs</artifactId>-->
<!--</dependency>-->
<!--<dependency>-->
- <!--<groupId>org.codehaus.jackson</groupId>-->
- <!--<artifactId>jackson-xc</artifactId>-->
+ <!--<groupId>org.codehaus.jackson</groupId>-->
+ <!--<artifactId>jackson-xc</artifactId>-->
<!--</dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
index 5c455f6..563db58 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
@@ -16,15 +16,6 @@
*/
package org.apache.eagle.alert.coordinator;
-import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.eagle.alert.config.ConfigBusProducer;
import org.apache.eagle.alert.config.ConfigValue;
import org.apache.eagle.alert.config.ZKConfig;
@@ -37,32 +28,43 @@ import org.apache.eagle.alert.coordinator.trigger.PolicyChangeListener;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.service.IMetadataServiceClient;
import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.google.common.base.Stopwatch;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* TODO: To simply avoid concurrent call of schdule, make the schedule as synchronized. This is not safe when multiple
* instance, consider a distributed lock for prevent multiple schedule happen concurrently.
+ *
+ * <p>Coordinator is a standalone java application, which listens to policy changes and use schedule
+ * algorithm to distribute policies 1) reacting to shutdown events 2) start non-daemon thread to pull policies
+ * and figure out if polices are changed</p>
*
- * @since Mar 24, 2016 Coordinator is a standalone java application, which listens to policy changes and use schedule
- * algorithm to distribute policies 1) reacting to shutdown events 2) start non-daemon thread to pull policies
- * and figure out if polices are changed
+ * @since Mar 24, 2016
*/
public class Coordinator {
private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
private static final String COORDINATOR = "coordinator";
+
/**
- * {@link ZKMetadataChangeNotifyService}
- * /alert/{topologyName}/spout
- * /router
- * /alert
- * /publisher
+ * /alert/{topologyName}/spout
+ * /router
+ * /alert
+ * /publisher
+ * .
*/
private static final String ZK_ALERT_CONFIG_SPOUT = "{0}/spout";
private static final String ZK_ALERT_CONFIG_ROUTER = "{0}/router";
@@ -70,16 +72,16 @@ public class Coordinator {
private static final String ZK_ALERT_CONFIG_PUBLISHER = "{0}/publisher";
- private final static String METADATA_SERVICE_HOST = "metadataService.host";
- private final static String METADATA_SERVICE_PORT = "metadataService.port";
- private final static String METADATA_SERVICE_CONTEXT = "metadataService.context";
- private final static String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis";
- private final static String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis";
-
- private final static String GREEDY_SCHEDULER_ZK_PATH = "/alert/greedy/leader";
- private final static String POLICY_SCHEDULER_ZK_PATH = "/alert/policy/leader";
- private final static int ACQUIRE_LOCK_WAIT_INTERVAL_MS = 2000;
- private final static int ACQUIRE_LOCK_MAX_RETRIES_TIMES = 90; //about 9 minutes
+ private static final String METADATA_SERVICE_HOST = "metadataService.host";
+ private static final String METADATA_SERVICE_PORT = "metadataService.port";
+ private static final String METADATA_SERVICE_CONTEXT = "metadataService.context";
+ private static final String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis";
+ private static final String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis";
+
+ private static final String GREEDY_SCHEDULER_ZK_PATH = "/alert/greedy/leader";
+ private static final String POLICY_SCHEDULER_ZK_PATH = "/alert/policy/leader";
+ private static final int ACQUIRE_LOCK_WAIT_INTERVAL_MS = 2000;
+ private static final int ACQUIRE_LOCK_MAX_RETRIES_TIMES = 90; //about 9 minutes
private volatile ScheduleState currentState = null;
private final ConfigBusProducer producer;
@@ -103,58 +105,60 @@ public class Coordinator {
}
public synchronized ScheduleState schedule(ScheduleOption option) {
- ScheduleZkState scheduleZkState = new ScheduleZkState();
- ExclusiveExecutor.Runnable exclusiveRunnable = new ExclusiveExecutor.Runnable() {
- @Override
- public void run() throws Exception {
- scheduleZkState.scheduleAcquired = true;
-
- while (!scheduleZkState.scheduleCompleted) {
- Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
- }
- }
- };
- ExclusiveExecutor.execute(GREEDY_SCHEDULER_ZK_PATH, exclusiveRunnable);
- int waitMaxTimes = 0;
- while (waitMaxTimes < ACQUIRE_LOCK_MAX_RETRIES_TIMES) { //about 3 minutes waiting
- if (!scheduleZkState.scheduleAcquired) {
- waitMaxTimes ++;
- try {
- Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
- } catch (InterruptedException e) {}
- continue;
- }
-
- ScheduleState state = null;
- try {
- Stopwatch watch = Stopwatch.createStarted();
- IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext();
- TopologyMgmtService mgmtService = new TopologyMgmtService();
- IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
-
- scheduler.init(context, mgmtService);
- state = scheduler.schedule(option);
-
- long scheduleTime = watch.elapsed(TimeUnit.MILLISECONDS);
- state.setScheduleTimeMillis((int) scheduleTime);// hardcode to integer
- watch.reset();
- watch.start();
-
- // persist & notify
- postSchedule(client, state, producer);
-
- watch.stop();
- long postTime = watch.elapsed(TimeUnit.MILLISECONDS);
- LOG.info("Schedule result, schedule time {} ms, post schedule time {} ms !", scheduleTime, postTime);
-
- currentState = state;
- } finally {
- //schedule completed
- scheduleZkState.scheduleCompleted = true;
- }
- return state;
- }
- throw new LockWebApplicationException("Acquire scheduler lock failed, please retry later");
+ ScheduleZkState scheduleZkState = new ScheduleZkState();
+ ExclusiveExecutor.Runnable exclusiveRunnable = new ExclusiveExecutor.Runnable() {
+ @Override
+ public void run() throws Exception {
+ scheduleZkState.scheduleAcquired = true;
+
+ while (!scheduleZkState.scheduleCompleted) {
+ Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
+ }
+ }
+ };
+ ExclusiveExecutor.execute(GREEDY_SCHEDULER_ZK_PATH, exclusiveRunnable);
+ int waitMaxTimes = 0;
+ while (waitMaxTimes < ACQUIRE_LOCK_MAX_RETRIES_TIMES) { //about 3 minutes waiting
+ if (!scheduleZkState.scheduleAcquired) {
+ waitMaxTimes++;
+ try {
+ Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ // ignored
+ }
+ continue;
+ }
+
+ ScheduleState state = null;
+ try {
+ Stopwatch watch = Stopwatch.createStarted();
+ IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext();
+ TopologyMgmtService mgmtService = new TopologyMgmtService();
+ IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
+
+ scheduler.init(context, mgmtService);
+ state = scheduler.schedule(option);
+
+ long scheduleTime = watch.elapsed(TimeUnit.MILLISECONDS);
+ state.setScheduleTimeMillis((int) scheduleTime);// hardcode to integer
+ watch.reset();
+ watch.start();
+
+ // persist & notify
+ postSchedule(client, state, producer);
+
+ watch.stop();
+ long postTime = watch.elapsed(TimeUnit.MILLISECONDS);
+ LOG.info("Schedule result, schedule time {} ms, post schedule time {} ms !", scheduleTime, postTime);
+
+ currentState = state;
+ } finally {
+ //schedule completed
+ scheduleZkState.scheduleCompleted = true;
+ }
+ return state;
+ }
+ throw new LockWebApplicationException("Acquire scheduler lock failed, please retry later");
}
public static void postSchedule(IMetadataServiceClient client, ScheduleState state, ConfigBusProducer producer) {
@@ -187,7 +191,7 @@ public class Coordinator {
}
/**
- * shutdown background threads and release various resources
+ * shutdown background threads and release various resources.
*/
private static class CoordinatorShutdownHook implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorShutdownHook.class);
@@ -215,7 +219,7 @@ public class Coordinator {
}
private static class PolicyChangeHandler implements PolicyChangeListener {
- private final static Logger LOG = LoggerFactory.getLogger(PolicyChangeHandler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(PolicyChangeHandler.class);
private Config config;
private IMetadataServiceClient client;
@@ -226,11 +230,11 @@ public class Coordinator {
@Override
public void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies,
- Collection<String> removedPolicies, Collection<String> modifiedPolicies) {
+ Collection<String> removedPolicies, Collection<String> modifiedPolicies) {
LOG.info("policy changed ... ");
LOG.info("allPolicies: " + allPolicies + ", addedPolicies: " + addedPolicies + ", removedPolicies: "
- + removedPolicies + ", modifiedPolicies: " + modifiedPolicies);
-
+ + removedPolicies + ", modifiedPolicies: " + modifiedPolicies);
+
CoordinatorTrigger trigger = new CoordinatorTrigger(config, client);
trigger.run();
@@ -239,48 +243,48 @@ public class Coordinator {
public static void main(String[] args) throws Exception {
startSchedule();
-
+
Thread.currentThread().join();
}
public static void startSchedule() {
- ExclusiveExecutor.execute(POLICY_SCHEDULER_ZK_PATH, new ExclusiveExecutor.Runnable() {
-
- @Override
- public void run() throws Exception {
- Config config = ConfigFactory.load().getConfig(COORDINATOR);
- // build dynamic policy loader
- String host = config.getString(METADATA_SERVICE_HOST);
- int port = config.getInt(METADATA_SERVICE_PORT);
- String context = config.getString(METADATA_SERVICE_CONTEXT);
- IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context);
- DynamicPolicyLoader loader = new DynamicPolicyLoader(client);
- loader.addPolicyChangeListener(new PolicyChangeHandler(config, client));
-
- // schedule dynamic policy loader
- long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS);
- long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS);
- ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setDaemon(true);
- return t;
- }
- });
- scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
-
- //
- scheduleSrv.scheduleAtFixedRate(new CoordinatorTrigger(config, client), CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_DELAY,
- CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_INTERVAL, TimeUnit.MILLISECONDS);
-
- Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv)));
- LOG.info("Eagle Coordinator started ...");
-
+ ExclusiveExecutor.execute(POLICY_SCHEDULER_ZK_PATH, new ExclusiveExecutor.Runnable() {
+
+ @Override
+ public void run() throws Exception {
+ Config config = ConfigFactory.load().getConfig(COORDINATOR);
+ // build dynamic policy loader
+ String host = config.getString(METADATA_SERVICE_HOST);
+ int port = config.getInt(METADATA_SERVICE_PORT);
+ String context = config.getString(METADATA_SERVICE_CONTEXT);
+ IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context);
+ DynamicPolicyLoader loader = new DynamicPolicyLoader(client);
+ loader.addPolicyChangeListener(new PolicyChangeHandler(config, client));
+
+ // schedule dynamic policy loader
+ long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS);
+ long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS);
+ ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
+
+ //
+ scheduleSrv.scheduleAtFixedRate(new CoordinatorTrigger(config, client), CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_DELAY,
+ CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_INTERVAL, TimeUnit.MILLISECONDS);
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv)));
+ LOG.info("Eagle Coordinator started ...");
+
Thread.currentThread().join();
- }
-
- });
+ }
+
+ });
}
public void enforcePeriodicallyBuild() {
@@ -294,10 +298,10 @@ public class Coordinator {
public static boolean isPeriodicallyForceBuildEnable() {
return forcePeriodicallyBuild.get();
}
-
+
public static class ScheduleZkState {
- volatile boolean scheduleAcquired = false;
+ volatile boolean scheduleAcquired = false;
volatile boolean scheduleCompleted = false;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
index 3458b3e..0a09de2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
@@ -16,10 +16,6 @@
*/
package org.apache.eagle.alert.coordinator;
-/**
- * @since Apr 22, 2016
- *
- */
public class CoordinatorConstants {
public static final String CONFIG_ITEM_COORDINATOR = "coordinator";
public static final String CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND = "topologyLoadUpbound";
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
index c9bda16..7ebf26a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
@@ -16,29 +16,27 @@
*/
package org.apache.eagle.alert.coordinator;
-import javax.servlet.ServletContextEvent;
-import javax.servlet.ServletContextListener;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
/**
- * @since Jun 16, 2016
- *
+ * @since Jun 16, 2016.
*/
public class CoordinatorListener implements ServletContextListener {
-
+
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorListener.class);
-
+
public CoordinatorListener() {
}
-
+
@Override
public void contextInitialized(ServletContextEvent sce) {
- LOG.info("start coordinator background tasks..");
+ LOG.info("start coordinator background tasks..");
Coordinator.startSchedule();
}
-
+
@Override
public void contextDestroyed(ServletContextEvent sce) {
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
index 96e6fce..74329e3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
@@ -16,6 +16,10 @@
*/
package org.apache.eagle.alert.coordinator;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.config.ZKConfigBuilder;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -25,94 +29,88 @@ import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
public class ExclusiveExecutor {
- private static final Logger LOG = LoggerFactory.getLogger(ExclusiveExecutor.class);
-
- // private static final String PATH = "/alert/listener/leader";
- private static final String COORDINATOR = "coordinator";
- private static final int ZK_RETRYPOLICY_SLEEP_TIME_MS = 1000;
- private static final int ZK_RETRYPOLICY_MAX_RETRIES = 3;
+ private static final Logger LOG = LoggerFactory.getLogger(ExclusiveExecutor.class);
- private static final CuratorFramework client;
+ // private static final String PATH = "/alert/listener/leader";
+ private static final String COORDINATOR = "coordinator";
+ private static final int ZK_RETRYPOLICY_SLEEP_TIME_MS = 1000;
+ private static final int ZK_RETRYPOLICY_MAX_RETRIES = 3;
- static {
- Config config = ConfigFactory.load().getConfig(COORDINATOR);
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(ZK_RETRYPOLICY_SLEEP_TIME_MS, ZK_RETRYPOLICY_MAX_RETRIES);
- ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
- client = CuratorFrameworkFactory.newClient(zkConfig.zkQuorum, retryPolicy);
- client.start();
- }
+ private static final CuratorFramework client;
- public static abstract class Runnable {
+ static {
+ Config config = ConfigFactory.load().getConfig(COORDINATOR);
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(ZK_RETRYPOLICY_SLEEP_TIME_MS, ZK_RETRYPOLICY_MAX_RETRIES);
+ ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+ client = CuratorFrameworkFactory.newClient(zkConfig.zkQuorum, retryPolicy);
+ client.start();
+ }
- boolean completed = false;
- LeaderSelector selector;
+ public abstract static class Runnable {
+ boolean completed = false;
+ LeaderSelector selector;
- public abstract void run() throws Exception;
+ public abstract void run() throws Exception;
- public void registerResources(LeaderSelector selector) {
- this.selector = selector;
- }
+ public void registerResources(LeaderSelector selector) {
+ this.selector = selector;
+ }
- public void runElegantly() throws Exception {
- this.run();
+ public void runElegantly() throws Exception {
+ this.run();
- LOG.info("Close selector resources {}", this.selector);
- CloseableUtils.closeQuietly(this.selector);
+ LOG.info("Close selector resources {}", this.selector);
+ CloseableUtils.closeQuietly(this.selector);
- completed = true;
- }
+ completed = true;
+ }
- public boolean isCompleted() {
- return completed;
- }
+ public boolean isCompleted() {
+ return completed;
+ }
- }
+ }
- public static void execute(String path, final Runnable runnable) {
- LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() {
+ public static void execute(String path, final Runnable runnable) {
+ LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() {
- @Override
- public void takeLeadership(CuratorFramework client) throws Exception {
- // this callback will get called when you are the leader
- // do whatever leader work you need to and only exit
- // this method when you want to relinquish leadership
- LOG.info("this is leader node right now..");
- runnable.runElegantly();
- }
+ @Override
+ public void takeLeadership(CuratorFramework client) throws Exception {
+ // this callback will get called when you are the leader
+ // do whatever leader work you need to and only exit
+ // this method when you want to relinquish leadership
+ LOG.info("this is leader node right now..");
+ runnable.runElegantly();
+ }
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState) {
- LOG.info(String.format("leader selector state change listener, new state: %s", newState.toString()));
- }
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ LOG.info(String.format("leader selector state change listener, new state: %s", newState.toString()));
+ }
- };
+ };
- LeaderSelector selector = new LeaderSelector(client, path, listener);
- selector.autoRequeue(); // not required, but this is behavior that you
- // will probably expect
- selector.start();
+ LeaderSelector selector = new LeaderSelector(client, path, listener);
+ selector.autoRequeue(); // not required, but this is behavior that you
+ // will probably expect
+ selector.start();
- runnable.registerResources(selector);
+ runnable.registerResources(selector);
- Runtime.getRuntime().addShutdownHook(new Thread(new java.lang.Runnable() {
+ Runtime.getRuntime().addShutdownHook(new Thread(new java.lang.Runnable() {
- @Override
- public void run() {
- LOG.info("Close zk client resources {}", ExclusiveExecutor.client);
- CloseableUtils.closeQuietly(ExclusiveExecutor.client);
- }
+ @Override
+ public void run() {
+ LOG.info("Close zk client resources {}", ExclusiveExecutor.client);
+ CloseableUtils.closeQuietly(ExclusiveExecutor.client);
+ }
- }));
- }
+ }));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
index 5e61443..0cf4a5d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
@@ -19,8 +19,7 @@ package org.apache.eagle.alert.coordinator;
import org.apache.eagle.alert.coordination.model.ScheduleState;
/**
- * @since Mar 24, 2016
- *
+ * @since Mar 24, 2016.
*/
public interface IPolicyScheduler {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
index 0cde22d..b21948b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
@@ -16,8 +16,6 @@
*/
package org.apache.eagle.alert.coordinator;
-import java.util.Map;
-
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -28,9 +26,10 @@ import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import java.util.Map;
+
/**
- * @since Mar 28, 2016
- *
+ * @since Mar 28, 2016.
*/
public interface IScheduleContext {
@@ -48,7 +47,7 @@ public interface IScheduleContext {
Map<String, PolicyAssignment> getPolicyAssignments();
Map<StreamGroup, MonitoredStream> getMonitoredStreams();
-
+
Map<String, Publishment> getPublishments();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
index 69e799f..29e8cac 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
@@ -21,14 +21,14 @@ import javax.ws.rs.core.Response;
public class LockWebApplicationException extends WebApplicationException {
- private static final long serialVersionUID = 3441072187262776401L;
+ private static final long serialVersionUID = 3441072187262776401L;
- public LockWebApplicationException() {
- super(Response.Status.INTERNAL_SERVER_ERROR);
- }
+ public LockWebApplicationException() {
+ super(Response.Status.INTERNAL_SERVER_ERROR);
+ }
- public LockWebApplicationException(String message) {
- super(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(message).type("text/plain").build());
- }
+ public LockWebApplicationException(String message) {
+ super(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(message).type("text/plain").build());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
index 8bccb53..c1bc726 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
@@ -19,8 +19,7 @@ package org.apache.eagle.alert.coordinator;
import org.apache.eagle.alert.coordinator.impl.GreedyPolicyScheduler;
/**
- * @since Mar 24, 2016
- *
+ * @since Mar 24, 2016.
*/
public class PolicySchedulerFactory {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
index 6c04e61..6fe27c5 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
@@ -18,11 +18,9 @@ package org.apache.eagle.alert.coordinator;
/**
* A runtime option for one schedule processing.
- *
- * Could used for configuration override.
- *
- * @since Apr 19, 2016
+ * <p>Could used for configuration override.</p>
*
+ * @since Apr 19, 2016
*/
public class ScheduleOption {
private int policiesPerBolt;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
index 4ae07f5..4ca9d5e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
@@ -19,18 +19,16 @@ package org.apache.eagle.alert.coordinator;
import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_COORDINATOR;
import static org.apache.eagle.alert.coordinator.CoordinatorConstants.NUM_OF_ALERT_BOLTS_PER_TOPOLOGY;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.eagle.alert.coordination.model.internal.Topology;
import org.apache.eagle.alert.coordinator.model.TopologyUsage;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import java.util.Collections;
+import java.util.List;
/**
- * @since Mar 29, 2016
- *
+ * @since Mar 29, 2016.
*/
public class TopologyMgmtService {
@@ -61,21 +59,19 @@ public class TopologyMgmtService {
boltParallelism = config.getInt(CoordinatorConstants.BOLT_PARALLELISM);
numberOfBoltsPerTopology = config.getInt(NUM_OF_ALERT_BOLTS_PER_TOPOLOGY);
}
-
+
public int getNumberOfAlertBoltsInTopology() {
return numberOfBoltsPerTopology;
}
/**
- * TODO: call topology mgmt API to create a topology
- *
- * @return
+ * TODO: call topology mgmt API to create a topology.
*/
public TopologyMeta creatTopology() {
// TODO
throw new UnsupportedOperationException("not supported yet!");
}
-
+
public List<TopologyMeta> listTopologies() {
// TODO
return Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
index ebc533e..49a16ff 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
@@ -16,24 +16,10 @@
*/
package org.apache.eagle.alert.coordinator.impl;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_BOLT_LOAD_UPBOUND;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_COORDINATOR;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.POLICIES_PER_BOLT;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.POLICY_DEFAULT_PARALLELISM;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
+import static org.apache.eagle.alert.coordinator.CoordinatorConstants.*;
import org.apache.eagle.alert.coordination.model.ScheduleState;
import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordination.model.internal.*;
import org.apache.eagle.alert.coordinator.IPolicyScheduler;
import org.apache.eagle.alert.coordinator.IScheduleContext;
import org.apache.eagle.alert.coordinator.ScheduleOption;
@@ -44,31 +30,31 @@ import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
/**
* A simple greedy assigner. <br/>
- * A greedy assigner simply loop the policies, find the most suitable topology
+ *
+ * <p>A greedy assigner simply loop the policies, find the most suitable topology
* to locate the policy first, then assign the topics to corresponding
- * spouts/group-by bolts.
- *
- * <br/>
- * For each given policy, the greedy steps are
+ * spouts/group-by bolts.</p>
+ *
+ * <p>For each given policy, the greedy steps are</p>
+ *
* <ul>
* <li>1. Find the same topology that already serve the policy without exceed the load</li>
* <li>2. Find the topology that already take the source traffic without exceed the load</li>
* <li>3. Find the topology that available to place source topic without exceed the load</li>
* <li>4. Create a new topology and locate the policy</li>
* <li>Route table generated after all policies assigned</li>
- * <ul>
- * <br/>
- *
+ * </ul>
* @since Mar 24, 2016
- *
*/
public class GreedyPolicyScheduler implements IPolicyScheduler {
@@ -111,8 +97,8 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
* Answer: two possible place: a global route table will be generated, those target not in current topology tuples will be dropped. This make the partition for tuple to alert
* <li>How to support add topology on demand by evaluate the available topology bandwidth(need topology level load)?</li>
* Answer: Use configured topology load up-bound, when topology load is available, will adopt
- * <ul>
- * <pre>
+ * </ul>
+ * </pre>
*/
List<ScheduleResult> results = new ArrayList<ScheduleResult>();
Map<String, PolicyAssignment> newAssignments = new HashMap<String, PolicyAssignment>();
@@ -152,13 +138,13 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
}
private ScheduleState generateMonitorMetadata(List<WorkItem> expandworkSets,
- Map<String, PolicyAssignment> newAssignments) {
+ Map<String, PolicyAssignment> newAssignments) {
MonitorMetadataGenerator generator = new MonitorMetadataGenerator(context);
return generator.generate(expandworkSets);
}
private void placePolicy(PolicyDefinition def, AlertBoltUsage alertBoltUsage, Topology targetTopology,
- TopologyUsage usage) {
+ TopologyUsage usage) {
String policyName = def.getName();
// topology usage update
@@ -169,26 +155,26 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
// update source topics
updateDataSource(usage, def);
-
+
// update group-by
updateGrouping(usage, def);
}
private void updateGrouping(TopologyUsage usage, PolicyDefinition def) {
// groupByMeta is removed since groupspec generate doesnt need it now.
-// List<StreamPartition> policyPartitionSpec = def.getPartitionSpec();
-// Map<String, List<StreamPartition>> groupByMeta = usage.getGroupByMeta();
-// for (StreamPartition par : policyPartitionSpec) {
-// List<StreamPartition> partitions = groupByMeta.get(par.getStreamId());
-// if (partitions == null) {
-// partitions = new ArrayList<StreamPartition>();
-// // de-dup of the partition on the list?
-// groupByMeta.put(par.getStreamId(), partitions);
-// }
-// if (!partitions.contains(par)) {
-// partitions.add(par);
-// }
-// }
+ // List<StreamPartition> policyPartitionSpec = def.getPartitionSpec();
+ // Map<String, List<StreamPartition>> groupByMeta = usage.getGroupByMeta();
+ // for (StreamPartition par : policyPartitionSpec) {
+ // List<StreamPartition> partitions = groupByMeta.get(par.getStreamId());
+ // if (partitions == null) {
+ // partitions = new ArrayList<StreamPartition>();
+ // // de-dup of the partition on the list?
+ // groupByMeta.put(par.getStreamId(), partitions);
+ // }
+ // if (!partitions.contains(par)) {
+ // partitions.add(par);
+ // }
+ // }
}
private void updateDataSource(TopologyUsage usage, PolicyDefinition def) {
@@ -216,13 +202,13 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
* <li>3. Find the topology that available to place source topic</li>
* <li>4. Create a new topology and locate the policy</li>
* <li>Route table generated after all policies assigned</li>
- * <ul>
+ * </ul>
* <br/>
- *
+ *
* @param newAssignments
*/
private ScheduleResult schedulePolicy(WorkItem item, Map<String, PolicyAssignment> newAssignments) {
- LOG.info(" schedule for {}", item );
+ LOG.info(" schedule for {}", item);
String policyName = item.def.getName();
StreamGroup policyStreamPartition = new StreamGroup();
@@ -260,14 +246,14 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
}
private void placePolicyToQueue(PolicyDefinition def, StreamWorkSlotQueue queue,
- Map<String, PolicyAssignment> newAssignments) {
+ Map<String, PolicyAssignment> newAssignments) {
for (WorkSlot slot : queue.getWorkingSlots()) {
Topology targetTopology = context.getTopologies().get(slot.getTopologyName());
TopologyUsage usage = context.getTopologyUsages().get(slot.getTopologyName());
AlertBoltUsage alertBoltUsage = usage.getAlertBoltUsage(slot.getBoltId());
placePolicy(def, alertBoltUsage, targetTopology, usage);
}
-// queue.placePolicy(def);
+ // queue.placePolicy(def);
PolicyAssignment assignment = new PolicyAssignment(def.getName(), queue.getQueueId());
context.getPolicyAssignments().put(def.getName(), assignment);
newAssignments.put(def.getName(), assignment);
@@ -286,22 +272,19 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
WorkQueueBuilder builder = new WorkQueueBuilder(context, mgmtService);
// TODO : get the properties from policy definiton
targetQueue = builder.createQueue(targetdStream, false, getQueueSize(def.getParallelismHint()),
- new HashMap<String, Object>());
+ new HashMap<String, Object>());
}
return targetQueue;
}
/**
- * Some strategy to generate correct size in Startegy of queue builder
- *
- * @param hint
- * @return
+ * Some strategy to generate correct size in Startegy of queue builder.
*/
private int getQueueSize(int hint) {
- if (hint == 0) {
- // some policies require single bolt to execute
- return 1;
- }
+ if (hint == 0) {
+ // some policies require single bolt to execute
+ return 1;
+ }
return initialQueueSize * ((hint + initialQueueSize - 1) / initialQueueSize);
}
@@ -323,7 +306,7 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
private boolean isBoltAvailable(AlertBoltUsage boltUsage, PolicyDefinition def) {
// overload or over policy # or already contains
if (boltUsage == null || boltUsage.getLoad() > boltLoadUpbound
- || boltUsage.getPolicies().size() > policiesPerBolt || boltUsage.getPolicies().contains(def.getName())) {
+ || boltUsage.getPolicies().size() > policiesPerBolt || boltUsage.getPolicies().contains(def.getName())) {
return false;
}
return true;
@@ -333,7 +316,7 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
this.context = new InMemScheduleConext(context);
this.mgmtService = mgmtService;
}
-
+
public IScheduleContext getContext() {
return context;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
index 40f16e9..c5c992b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
@@ -16,22 +16,7 @@
*/
package org.apache.eagle.alert.coordinator.impl;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.coordination.model.*;
import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
@@ -46,12 +31,18 @@ import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
/**
- * @since Apr 26, 2016
* Given current policy placement, figure out monitor metadata
- *
- * TODO: refactor to eliminate the duplicate of stupid if-notInMap-then-create....
- * FIXME: too many duplicated code logic : check null; add list to map; add to list..
+ *
+ * <p>TODO: refactor to eliminate the duplicate of stupid if-notInMap-then-create....
+ * FIXME: too many duplicated code logic : check null; add list to map; add to list..</p>
+ *
+ * @since Apr 26, 2016
*/
public class MonitorMetadataGenerator {
@@ -76,15 +67,15 @@ public class MonitorMetadataGenerator {
Map<String, PublishSpec> publishSpecsMap = generatePublishMetadata();
String uniqueVersion = generateVersion();
- ScheduleState status = new ScheduleState(uniqueVersion,
- topoSpoutSpecsMap,
- groupSpecsMap,
- alertSpecsMap,
- publishSpecsMap,
- context.getPolicyAssignments().values(),
- context.getMonitoredStreams().values(),
- context.getPolicies().values(),
- context.getStreamSchemas().values());
+ ScheduleState status = new ScheduleState(uniqueVersion,
+ topoSpoutSpecsMap,
+ groupSpecsMap,
+ alertSpecsMap,
+ publishSpecsMap,
+ context.getPolicyAssignments().values(),
+ context.getMonitoredStreams().values(),
+ context.getPolicies().values(),
+ context.getStreamSchemas().values());
return status;
}
@@ -127,7 +118,7 @@ public class MonitorMetadataGenerator {
}
/**
- * FIXME: add auto-increment version number?
+ * FIXME: add auto-increment version number?.
*/
private String generateVersion() {
return "spec_version_" + System.currentTimeMillis();
@@ -159,7 +150,7 @@ public class MonitorMetadataGenerator {
spec = new RouterSpec(u.getTopoName());
groupSpecsMap.put(u.getTopoName(), spec);
}
-
+
for (MonitoredStream ms : u.getMonitoredStream()) {
// mutiple stream on the same policy group : for correlation group case:
for (StreamPartition partiton : ms.getStreamGroup().getStreamPartitions()) {
@@ -184,7 +175,7 @@ public class MonitorMetadataGenerator {
private Map<String, SpoutSpec> generateSpoutMonitorMetadata() {
Map<String, StreamWorkSlotQueue> queueMap = buildQueueMap();
-
+
Map<String, SpoutSpec> topoSpoutSpecsMap = new HashMap<String, SpoutSpec>();
// streamName -> StreamDefinition
Map<String, StreamDefinition> streamSchemaMap = context.getStreamSchemas();
@@ -207,7 +198,7 @@ public class MonitorMetadataGenerator {
Map<String, List<StreamRepartitionMetadata>> streamsMap = new HashMap<String, List<StreamRepartitionMetadata>>();
for (String policyName : usage.getPolicies()) {
PolicyDefinition def = context.getPolicies().get(policyName);
-
+
PolicyAssignment assignment = context.getPolicyAssignments().get(policyName);
if (assignment == null) {
LOG.error(" can not find assignment for policy {} ! ", policyName);
@@ -245,9 +236,7 @@ public class MonitorMetadataGenerator {
/**
* Work queue not a root level object, thus we need to build a map from
- * MonitoredStream for later quick lookup
- *
- * @return
+ * MonitoredStream for later quick lookup.
*/
private Map<String, StreamWorkSlotQueue> buildQueueMap() {
Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
@@ -260,7 +249,7 @@ public class MonitorMetadataGenerator {
}
private void addGroupingStrategy(Map<String, List<StreamRepartitionMetadata>> streamsMap, String stream,
- StreamDefinition schema, String topicName, String datasourceName, StreamRepartitionStrategy gs) {
+ StreamDefinition schema, String topicName, String datasourceName, StreamRepartitionStrategy gs) {
List<StreamRepartitionMetadata> dsStreamMeta;
if (streamsMap.containsKey(topicName)) {
dsStreamMeta = streamsMap.get(topicName);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
index ea96d79..a46537d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
@@ -16,17 +16,14 @@
*/
package org.apache.eagle.alert.coordinator.impl;
-import java.util.List;
-
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import java.util.List;
/**
- * Schedule result for one policy
- *
- *
- * @since Apr 26, 2016
+ * Schedule result for one policy.
*
+ * @since Apr 26, 2016
*/
public class ScheduleResult {
int code;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
index a32b8fb..f44f80e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
@@ -16,10 +16,6 @@
*/
package org.apache.eagle.alert.coordinator.impl;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.eagle.alert.coordination.model.WorkSlot;
import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
@@ -32,9 +28,12 @@ import org.apache.eagle.alert.coordinator.model.TopologyUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
/**
- * @since Apr 27, 2016
- *
+ * @since Apr 27, 2016.
*/
public class WorkQueueBuilder {
@@ -49,7 +48,7 @@ public class WorkQueueBuilder {
}
public StreamWorkSlotQueue createQueue(MonitoredStream stream, boolean isDedicated, int size,
- Map<String, Object> properties) {
+ Map<String, Object> properties) {
// FIXME: make extensible and configurable
IWorkSlotStrategy strategy = new SameTopologySlotStrategy(context, stream.getStreamGroup(), mgmtService);
List<WorkSlot> slots = strategy.reserveWorkSlots(size, isDedicated, properties);
@@ -58,7 +57,7 @@ public class WorkQueueBuilder {
return null;
}
StreamWorkSlotQueue queue = new StreamWorkSlotQueue(stream.getStreamGroup(), isDedicated, properties,
- slots);
+ slots);
calculateGroupIndexAndCount(queue);
assignQueueSlots(stream, queue);// build reverse reference
stream.addQueues(queue);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
index 28df3c4..8528606 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
@@ -16,14 +16,12 @@
*/
package org.apache.eagle.alert.coordinator.impl.strategies;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
import java.util.List;
import java.util.Map;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-
/**
- * @since Apr 27, 2016
- *
+ * @since Apr 27, 2016.
*/
public interface IWorkSlotStrategy {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
index e755237..823a548 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
@@ -17,13 +17,6 @@
package org.apache.eagle.alert.coordinator.impl.strategies;
import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
import org.apache.eagle.alert.coordination.model.WorkSlot;
import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
import org.apache.eagle.alert.coordination.model.internal.Topology;
@@ -33,22 +26,22 @@ import org.apache.eagle.alert.coordinator.TopologyMgmtService;
import org.apache.eagle.alert.coordinator.TopologyMgmtService.TopologyMeta;
import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.*;
/**
* A simple strategy that only find the bolts in the same topology as the
* required work slots.
- *
- * Invariant:<br/>
+ *
+ * <p>Invariant:<br/>
* One slot queue only on the one topology.<br/>
- * One topology doesn't contains two same partition slot queues.
- *
- * @since Apr 27, 2016
+ * One topology doesn't contains two same partition slot queues.</p>
*
+ * @since Apr 27, 2016
*/
public class SameTopologySlotStrategy implements IWorkSlotStrategy {
@@ -58,28 +51,27 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy {
private final StreamGroup partitionGroup;
private final TopologyMgmtService mgmtService;
-// private final int numOfPoliciesBoundPerBolt;
+ // private final int numOfPoliciesBoundPerBolt;
private final double topoLoadUpbound;
public SameTopologySlotStrategy(IScheduleContext context, StreamGroup streamPartitionGroup,
- TopologyMgmtService mgmtService) {
+ TopologyMgmtService mgmtService) {
this.context = context;
this.partitionGroup = streamPartitionGroup;
this.mgmtService = mgmtService;
Config config = ConfigFactory.load().getConfig(CoordinatorConstants.CONFIG_ITEM_COORDINATOR);
-// numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT);
+ // numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT);
topoLoadUpbound = config.getDouble(CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND);
}
/**
- * @param isDedicated
- * - not used yet!
+ * @param isDedicated - not used yet!.
*/
@Override
public List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties) {
Iterator<Topology> it = context.getTopologies().values().stream().filter((t) -> t.getNumOfAlertBolt() >= size)
- .iterator();
+ .iterator();
// priority strategy first???
List<WorkSlot> slots = new ArrayList<WorkSlot>();
while (it.hasNext()) {
@@ -139,15 +131,15 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy {
}
private boolean isTopologyAvailable(TopologyUsage u) {
-// for (MonitoredStream stream : u.getMonitoredStream()) {
-// if (partition.equals(stream.getStreamParitition())) {
-// return false;
-// }
-// }
+ // for (MonitoredStream stream : u.getMonitoredStream()) {
+ // if (partition.equals(stream.getStreamParitition())) {
+ // return false;
+ // }
+ // }
if (u == null || u.getLoad() > topoLoadUpbound) {
return false;
}
-
+
return true;
}
@@ -158,7 +150,7 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy {
}
// actually it's now 0;
return true;
-// return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt;
+ // return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
index e9148f5..36c0bce 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
@@ -16,16 +16,14 @@
*/
package org.apache.eagle.alert.coordinator.model;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import java.util.ArrayList;
+import java.util.List;
/**
- * @since Mar 28, 2016
- *
+ * @since Mar 28, 2016.
*/
public class AlertBoltUsage {
@@ -56,9 +54,9 @@ public class AlertBoltUsage {
public void addPolicies(PolicyDefinition pd) {
policies.add(pd.getName());
// add first partition
-// for (StreamPartition par : pd.getPartitionSpec()) {
-// partitions.add(par);
-// }
+ // for (StreamPartition par : pd.getPartitionSpec()) {
+ // partitions.add(par);
+ // }
}
public double getLoad() {
@@ -76,7 +74,7 @@ public class AlertBoltUsage {
public List<StreamWorkSlotQueue> getReferQueues() {
return referQueues;
}
-
+
public int getQueueSize() {
return referQueues.size();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
index 86238d1..39788d5 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
@@ -16,24 +16,22 @@
*/
package org.apache.eagle.alert.coordinator.model;
-
/**
- * @since Mar 28, 2016
- *
+ * @since Mar 28, 2016.
*/
public class GroupBoltUsage {
private String boltId;
private double load;
-
+
public GroupBoltUsage(String boltId) {
this.boltId = boltId;
}
-// private final Set<String> streams = new HashSet<String>();
-// private final Map<String, StreamFilter> filters = new HashMap<String, StreamFilter>();
+ // private final Set<String> streams = new HashSet<String>();
+ // private final Map<String, StreamFilter> filters = new HashMap<String, StreamFilter>();
-// private final Map<String, List<StreamPartition>> groupByMeta;
+ // private final Map<String, List<StreamPartition>> groupByMeta;
public double getLoad() {
return load;
@@ -43,18 +41,18 @@ public class GroupBoltUsage {
this.load = load;
}
-// public Set<String> getStreams() {
-// return streams;
-// }
-//
-//
-// public Map<String, StreamFilter> getFilters() {
-// return filters;
-// }
+ // public Set<String> getStreams() {
+ // return streams;
+ // }
+ //
+ //
+ // public Map<String, StreamFilter> getFilters() {
+ // return filters;
+ // }
-// public Map<String, List<StreamPartition>> getGroupByMeta() {
-// return groupByMeta;
-// }
+ // public Map<String, List<StreamPartition>> getGroupByMeta() {
+ // return groupByMeta;
+ // }
public String getBoltId() {
return boltId;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
index 6eb6195..3cfc505 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
@@ -16,6 +16,7 @@
*/
package org.apache.eagle.alert.coordinator.model;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -23,11 +24,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-
/**
- * @since Mar 27, 2016
- *
+ * @since Mar 27, 2016.
*/
public class TopologyUsage {
// topo info
@@ -48,11 +46,11 @@ public class TopologyUsage {
public TopologyUsage() {
}
-
+
public TopologyUsage(String name) {
this.topoName = name;
}
-
+
public String getTopoName() {
return topoName;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
index dea9419..8f6cbc6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
@@ -16,9 +16,6 @@
*/
package org.apache.eagle.alert.coordinator.provider;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -29,10 +26,11 @@ import org.apache.eagle.alert.coordinator.model.TopologyUsage;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import java.util.HashMap;
+import java.util.Map;
/**
- * @since Mar 28, 2016
- *
+ * @since Mar 28, 2016.
*/
public class InMemScheduleConext implements IScheduleContext {
@@ -60,9 +58,9 @@ public class InMemScheduleConext implements IScheduleContext {
}
public InMemScheduleConext(Map<String, Topology> topologies2, Map<String, PolicyAssignment> assignments,
- Map<String, Kafka2TupleMetadata> kafkaSources, Map<String, PolicyDefinition> policies2,
- Map<String, Publishment> publishments2, Map<String, StreamDefinition> streamDefinitions,
- Map<StreamGroup, MonitoredStream> monitoredStreamMap, Map<String, TopologyUsage> usages2) {
+ Map<String, Kafka2TupleMetadata> kafkaSources, Map<String, PolicyDefinition> policies2,
+ Map<String, Publishment> publishments2, Map<String, StreamDefinition> streamDefinitions,
+ Map<StreamGroup, MonitoredStream> monitoredStreamMap, Map<String, TopologyUsage> usages2) {
this.topologies = topologies2;
this.policyAssignments = assignments;
this.datasources = kafkaSources;