You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC
[10/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java b/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
new file mode 100644
index 0000000..e0d048e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/DummyParticipant.java
@@ -0,0 +1,123 @@
+package org.apache.helix.examples;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+
+public class DummyParticipant
+{
+ // dummy master-slave state model
+ @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" })
+ public static class DummyMSStateModel extends StateModel
+ {
+ @Transition(to = "SLAVE", from = "OFFLINE")
+ public void onBecomeSlaveFromOffline(Message message, NotificationContext context)
+ {
+ String partitionName = message.getPartitionName();
+ String instanceName = message.getTgtName();
+ System.out.println(instanceName + " becomes SLAVE from OFFLINE for " + partitionName);
+ }
+
+ @Transition(to = "MASTER", from = "SLAVE")
+ public void onBecomeMasterFromSlave(Message message, NotificationContext context)
+ {
+ String partitionName = message.getPartitionName();
+ String instanceName = message.getTgtName();
+ System.out.println(instanceName + " becomes MASTER from SLAVE for " + partitionName);
+ }
+
+ @Transition(to = "SLAVE", from = "MASTER")
+ public void onBecomeSlaveFromMaster(Message message, NotificationContext context)
+ {
+ String partitionName = message.getPartitionName();
+ String instanceName = message.getTgtName();
+ System.out.println(instanceName + " becomes SLAVE from MASTER for " + partitionName);
+ }
+
+ @Transition(to = "OFFLINE", from = "SLAVE")
+ public void onBecomeOfflineFromSlave(Message message, NotificationContext context)
+ {
+ String partitionName = message.getPartitionName();
+ String instanceName = message.getTgtName();
+ System.out.println(instanceName + " becomes OFFLINE from SLAVE for " + partitionName);
+ }
+
+ @Transition(to = "DROPPED", from = "OFFLINE")
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+ {
+ String partitionName = message.getPartitionName();
+ String instanceName = message.getTgtName();
+ System.out.println(instanceName + " becomes DROPPED from OFFLINE for " + partitionName);
+ }
+
+ @Transition(to = "OFFLINE", from = "ERROR")
+ public void onBecomeOfflineFromError(Message message, NotificationContext context)
+ {
+ String partitionName = message.getPartitionName();
+ String instanceName = message.getTgtName();
+ System.out.println(instanceName + " becomes OFFLINE from ERROR for " + partitionName);
+ }
+
+ @Override
+ public void reset()
+ {
+ System.out.println("Default MockMSStateModel.reset() invoked");
+ }
+ }
+
+ // dummy master slave state model factory
+ public static class DummyMSModelFactory extends StateModelFactory<DummyMSStateModel>
+ {
+ @Override
+ public DummyMSStateModel createNewStateModel(String partitionName)
+ {
+ DummyMSStateModel model = new DummyMSStateModel();
+ return model;
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ if (args.length < 3)
+ {
+ System.err.println("USAGE: DummyParticipant zkAddress clusterName instanceName");
+ System.exit(1);
+ }
+
+ String zkAddr = args[0];
+ String clusterName = args[1];
+ String instanceName = args[2];
+
+ HelixManager manager = null;
+ try
+ {
+ manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
+ InstanceType.PARTICIPANT, zkAddr);
+
+ StateMachineEngine stateMach = manager.getStateMachineEngine();
+ DummyMSModelFactory msModelFactory = new DummyMSModelFactory();
+ stateMach.registerStateModelFactory("MasterSlave", msModelFactory);
+
+ manager.connect();
+
+ Thread.currentThread().join();
+ } catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally
+ {
+ if (manager != null)
+ {
+ manager.disconnect();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/ExampleHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/ExampleHelper.java b/helix-core/src/main/java/org/apache/helix/examples/ExampleHelper.java
new file mode 100644
index 0000000..f27fdad
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/ExampleHelper.java
@@ -0,0 +1,56 @@
+package org.apache.helix.examples;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+
+public class ExampleHelper
+{
+
+
+ public static ZkServer startZkServer(String zkAddr)
+ {
+ System.out.println("Start zookeeper at " + zkAddr + " in thread "
+ + Thread.currentThread().getName());
+
+ String zkDir = zkAddr.replace(':', '_');
+ final String logDir = "/tmp/" + zkDir + "/logs";
+ final String dataDir = "/tmp/" + zkDir + "/dataDir";
+ try
+ {
+ FileUtils.deleteDirectory(new File(dataDir));
+ FileUtils.deleteDirectory(new File(logDir));
+ } catch (IOException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+ @Override
+ public void createDefaultNameSpace(org.I0Itec.zkclient.ZkClient zkClient)
+ {
+ // do nothing
+ }
+ };
+
+ int port = Integer.parseInt(zkAddr.substring(zkAddr.lastIndexOf(':') + 1));
+ ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+ zkServer.start();
+
+ return zkServer;
+ }
+
+ public static void stopZkServer(ZkServer zkServer)
+ {
+ if (zkServer != null)
+ {
+ zkServer.shutdown();
+ System.out.println("Shut down zookeeper at port " + zkServer.getPort()
+ + " in thread " + Thread.currentThread().getName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
new file mode 100644
index 0000000..44eb75b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
@@ -0,0 +1,269 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.examples;
+
+import java.io.File;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.tools.ClusterStateVerifier;
+
+
+public class ExampleProcess
+{
+
+ public static final String zkServer = "zkSvr";
+ public static final String cluster = "cluster";
+ public static final String hostAddress = "host";
+ public static final String hostPort = "port";
+ public static final String relayCluster = "relayCluster";
+ public static final String help = "help";
+ public static final String configFile = "configFile";
+ public static final String stateModel = "stateModelType";
+ public static final String transDelay = "transDelay";
+
+ private final String zkConnectString;
+ private final String clusterName;
+ private final String instanceName;
+ private final String stateModelType;
+ private HelixManager manager;
+
+// private StateMachineEngine genericStateMachineHandler;
+
+ private String _file = null;
+ private StateModelFactory<StateModel> stateModelFactory;
+ private final int delay;
+
+ public ExampleProcess(String zkConnectString, String clusterName,
+ String instanceName, String file, String stateModel, int delay)
+ {
+ this.zkConnectString = zkConnectString;
+ this.clusterName = clusterName;
+ this.instanceName = instanceName;
+ this._file = file;
+ stateModelType = stateModel;
+ this.delay = delay;
+ }
+
+ public void start() throws Exception
+ {
+ if (_file == null)
+ {
+ manager = HelixManagerFactory.getZKHelixManager(clusterName,
+ instanceName,
+ InstanceType.PARTICIPANT,
+ zkConnectString);
+
+ }
+ else
+ {
+ manager = HelixManagerFactory.getStaticFileHelixManager(clusterName,
+ instanceName,
+ InstanceType.PARTICIPANT,
+ _file);
+
+ }
+
+ if ("MasterSlave".equalsIgnoreCase(stateModelType))
+ {
+ stateModelFactory = new MasterSlaveStateModelFactory(delay);
+ } else if ("OnlineOffline".equalsIgnoreCase(stateModelType))
+ {
+ stateModelFactory = new OnlineOfflineStateModelFactory(delay);
+ } else if ("LeaderStandby".equalsIgnoreCase(stateModelType))
+ {
+ stateModelFactory = new LeaderStandbyStateModelFactory(delay);
+ }
+// genericStateMachineHandler = new StateMachineEngine();
+// genericStateMachineHandler.registerStateModelFactory(stateModelType, stateModelFactory);
+
+ StateMachineEngine stateMach = manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory(stateModelType, stateModelFactory);
+ manager.connect();
+ manager.getMessagingService().registerMessageHandlerFactory(
+ MessageType.STATE_TRANSITION.toString(), stateMach);
+ if (_file != null)
+ {
+ ClusterStateVerifier.verifyFileBasedClusterStates(_file, instanceName,
+ stateModelFactory);
+
+ }
+ }
+
+ @SuppressWarnings("static-access")
+ private static Options constructCommandLineOptions()
+ {
+ Option helpOption = OptionBuilder.withLongOpt(help)
+ .withDescription("Prints command-line options info").create();
+
+ Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
+ .withDescription("Provide zookeeper address").create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+ Option clusterOption = OptionBuilder.withLongOpt(cluster)
+ .withDescription("Provide cluster name").create();
+ clusterOption.setArgs(1);
+ clusterOption.setRequired(true);
+ clusterOption.setArgName("Cluster name (Required)");
+
+ Option hostOption = OptionBuilder.withLongOpt(hostAddress)
+ .withDescription("Provide host name").create();
+ hostOption.setArgs(1);
+ hostOption.setRequired(true);
+ hostOption.setArgName("Host name (Required)");
+
+ Option portOption = OptionBuilder.withLongOpt(hostPort)
+ .withDescription("Provide host port").create();
+ portOption.setArgs(1);
+ portOption.setRequired(true);
+ portOption.setArgName("Host port (Required)");
+
+ Option stateModelOption = OptionBuilder.withLongOpt(stateModel)
+ .withDescription("StateModel Type").create();
+ stateModelOption.setArgs(1);
+ stateModelOption.setRequired(true);
+ stateModelOption.setArgName("StateModel Type (Required)");
+
+ // add an option group including either --zkSvr or --configFile
+ Option fileOption = OptionBuilder.withLongOpt(configFile)
+ .withDescription("Provide file to read states/messages").create();
+ fileOption.setArgs(1);
+ fileOption.setRequired(true);
+ fileOption.setArgName("File to read states/messages (Optional)");
+
+ Option transDelayOption = OptionBuilder.withLongOpt(transDelay)
+ .withDescription("Provide state trans delay").create();
+ transDelayOption.setArgs(1);
+ transDelayOption.setRequired(false);
+ transDelayOption.setArgName("Delay time in state transition, in MS");
+
+ OptionGroup optionGroup = new OptionGroup();
+ optionGroup.addOption(zkServerOption);
+ optionGroup.addOption(fileOption);
+
+ Options options = new Options();
+ options.addOption(helpOption);
+ // options.addOption(zkServerOption);
+ options.addOption(clusterOption);
+ options.addOption(hostOption);
+ options.addOption(portOption);
+ options.addOption(stateModelOption);
+ options.addOption(transDelayOption);
+
+ options.addOptionGroup(optionGroup);
+
+ return options;
+ }
+
+ public static void printUsage(Options cliOptions)
+ {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.printHelp("java " + ExampleProcess.class.getName(), cliOptions);
+ }
+
+ public static CommandLine processCommandLineArgs(String[] cliArgs)
+ throws Exception
+ {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+ try
+ {
+ return cliParser.parse(cliOptions, cliArgs);
+ } catch (ParseException pe)
+ {
+ System.err
+ .println("CommandLineClient: failed to parse command-line options: "
+ + pe.toString());
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+ return null;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ String zkConnectString = "localhost:2181";
+ String clusterName = "storage-integration-cluster";
+ String instanceName = "localhost_8905";
+ String file = null;
+ String stateModelValue = "MasterSlave";
+ int delay = 0;
+ boolean skipZeroArgs = true;// false is for dev testing
+ if (!skipZeroArgs || args.length > 0)
+ {
+ CommandLine cmd = processCommandLineArgs(args);
+ zkConnectString = cmd.getOptionValue(zkServer);
+ clusterName = cmd.getOptionValue(cluster);
+
+ String host = cmd.getOptionValue(hostAddress);
+ String portString = cmd.getOptionValue(hostPort);
+ int port = Integer.parseInt(portString);
+ instanceName = host + "_" + port;
+
+ file = cmd.getOptionValue(configFile);
+ if (file != null)
+ {
+ File f = new File(file);
+ if (!f.exists())
+ {
+ System.err.println("static config file doesn't exist");
+ System.exit(1);
+ }
+ }
+
+ stateModelValue = cmd.getOptionValue(stateModel);
+ if (cmd.hasOption(transDelay))
+ {
+ try
+ {
+ delay = Integer.parseInt(cmd.getOptionValue(transDelay));
+ if (delay < 0)
+ {
+ throw new Exception("delay must be positive");
+ }
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ delay = 0;
+ }
+ }
+ }
+ // Espresso_driver.py will consume this
+ System.out.println("Starting Process with ZK:" + zkConnectString);
+
+ ExampleProcess process = new ExampleProcess(zkConnectString, clusterName,
+ instanceName, file, stateModelValue, delay);
+
+ process.start();
+ Thread.currentThread().join();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
new file mode 100644
index 0000000..f090ace
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
@@ -0,0 +1,164 @@
+package org.apache.helix.examples;
+
+import java.io.File;
+
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.StateModelConfigGenerator;
+
+
+/**
+ * Ideal state json format file used in this example for CUSTOMIZED ideal state mode
+ * <p>
+ * <pre>
+ * {
+ * "id" : "TestDB",
+ * "mapFields" : {
+ * "TestDB_0" : {
+ * "localhost_12918" : "MASTER",
+ * "localhost_12919" : "SLAVE",
+ * "localhost_12920" : "SLAVE"
+ * },
+ * "TestDB_1" : {
+ * "localhost_12918" : "MASTER",
+ * "localhost_12919" : "SLAVE",
+ * "localhost_12920" : "SLAVE"
+ * },
+ * "TestDB_2" : {
+ * "localhost_12918" : "MASTER",
+ * "localhost_12919" : "SLAVE",
+ * "localhost_12920" : "SLAVE"
+ * },
+ * "TestDB_3" : {
+ * "localhost_12918" : "MASTER",
+ * "localhost_12919" : "SLAVE",
+ * "localhost_12920" : "SLAVE"
+ * }
+ * },
+ * "listFields" : {
+ * },
+ * "simpleFields" : {
+ * "IDEAL_STATE_MODE" : "CUSTOMIZED",
+ * "NUM_PARTITIONS" : "4",
+ * "REPLICAS" : "3",
+ * "STATE_MODEL_DEF_REF" : "MasterSlave",
+ * "STATE_MODEL_FACTORY_NAME" : "DEFAULT"
+ * }
+ * }
+ * </pre>
+ *
+ */
+
+public class IdealStateExample
+{
+
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length < 3)
+ {
+ System.err.println("USAGE: IdealStateExample zkAddress clusterName idealStateMode (AUTO, AUTO_REBALANCE, or CUSTOMIZED) idealStateJsonFile (required for CUSTOMIZED mode)");
+ System.exit(1);
+ }
+
+ final String zkAddr = args[0];
+ final String clusterName = args[1];
+ final String idealStateModeStr = args[2].toUpperCase();
+ String idealStateJsonFile = null;
+ IdealStateModeProperty idealStateMode =
+ IdealStateModeProperty.valueOf(idealStateModeStr);
+ if (idealStateMode == IdealStateModeProperty.CUSTOMIZED)
+ {
+ if (args.length < 4)
+ {
+ System.err.println("Missng idealStateJsonFile for CUSTOMIZED ideal state mode");
+ System.exit(1);
+ }
+ idealStateJsonFile = args[3];
+ }
+
+ // add cluster {clusterName}
+ ZkClient zkclient =
+ new ZkClient(zkAddr,
+ ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+ new ZNRecordSerializer());
+ ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+ admin.addCluster(clusterName, true);
+
+ // add MasterSlave state mode definition
+ StateModelConfigGenerator generator = new StateModelConfigGenerator();
+ admin.addStateModelDef(clusterName,
+ "MasterSlave",
+ new StateModelDefinition(generator.generateConfigForMasterSlave()));
+
+ // add 3 participants: "localhost:{12918, 12919, 12920}"
+ for (int i = 0; i < 3; i++)
+ {
+ int port = 12918 + i;
+ InstanceConfig config = new InstanceConfig("localhost_" + port);
+ config.setHostName("localhost");
+ config.setPort(Integer.toString(port));
+ config.setInstanceEnabled(true);
+ admin.addInstance(clusterName, config);
+ }
+
+ // add resource "TestDB" which has 4 partitions and uses MasterSlave state model
+ String resourceName = "TestDB";
+ if (idealStateMode == IdealStateModeProperty.AUTO
+ || idealStateMode == IdealStateModeProperty.AUTO_REBALANCE)
+ {
+ admin.addResource(clusterName, resourceName, 4, "MasterSlave", idealStateModeStr);
+
+ // rebalance resource "TestDB" using 3 replicas
+ admin.rebalance(clusterName, resourceName, 3);
+ }
+ else if (idealStateMode == IdealStateModeProperty.CUSTOMIZED)
+ {
+ admin.addIdealState(clusterName, resourceName, idealStateJsonFile);
+ }
+
+ // start helix controller
+ new Thread(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ HelixControllerMain.main(new String[] { "--zkSvr", zkAddr, "--cluster",
+ clusterName });
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ }).start();
+
+ // start 3 dummy participants
+ for (int i = 0; i < 3; i++)
+ {
+ int port = 12918 + i;
+ final String instanceName = "localhost_" + port;
+ new Thread(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ DummyParticipant.main(new String[] { zkAddr, clusterName, instanceName });
+ }
+ }).start();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
new file mode 100644
index 0000000..ec54970
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.examples;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class LeaderStandbyStateModelFactory extends
+ StateModelFactory<StateModel> {
+ int _delay;
+
+ public LeaderStandbyStateModelFactory(int delay) {
+ _delay = delay;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String stateUnitKey) {
+ LeaderStandbyStateModel stateModel = new LeaderStandbyStateModel();
+ stateModel.setDelay(_delay);
+ return stateModel;
+ }
+
+ public static class LeaderStandbyStateModel extends StateModel {
+ int _transDelay = 0;
+
+ public void setDelay(int delay) {
+ _transDelay = delay > 0 ? delay : 0;
+ }
+
+ public void onBecomeLeaderFromStandby(Message message,
+ NotificationContext context) {
+ System.out
+ .println("LeaderStandbyStateModel.onBecomeLeaderFromStandby()");
+ sleep();
+ }
+
+ public void onBecomeStandbyFromLeader(Message message,
+ NotificationContext context) {
+ System.out
+ .println("LeaderStandbyStateModel.onBecomeStandbyFromLeader()");
+ sleep();
+ }
+
+ private void sleep() {
+ try {
+ Thread.sleep(_transDelay);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
new file mode 100644
index 0000000..b8be32a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
@@ -0,0 +1,99 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.examples;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+@SuppressWarnings("rawtypes")
+public class MasterSlaveStateModelFactory extends StateModelFactory<StateModel> {
+ int _delay;
+
+ public MasterSlaveStateModelFactory(int delay) {
+ _delay = delay;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String stateUnitKey) {
+ MasterSlaveStateModel stateModel = new MasterSlaveStateModel();
+ stateModel.setDelay(_delay);
+ stateModel.setStateUnitKey(stateUnitKey);
+ return stateModel;
+ }
+
+ public static class MasterSlaveStateModel extends StateModel {
+ int _transDelay = 0;
+ String stateUnitKey;
+
+ public String getStateUnitKey() {
+ return stateUnitKey;
+ }
+
+ public void setStateUnitKey(String stateUnitKey) {
+ this.stateUnitKey = stateUnitKey;
+ }
+
+ public void setDelay(int delay) {
+ _transDelay = delay > 0 ? delay : 0;
+ }
+
+ public void onBecomeSlaveFromOffline(Message message,
+ NotificationContext context) {
+
+ System.out.println("MasterSlaveStateModel.onBecomeSlaveFromOffline() for "+ stateUnitKey);
+ sleep();
+ }
+
+ private void sleep() {
+ try {
+ Thread.sleep(_transDelay);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void onBecomeSlaveFromMaster(Message message,
+ NotificationContext context) {
+ System.out.println("MasterSlaveStateModel.onBecomeSlaveFromMaster() for "+ stateUnitKey);
+ sleep();
+
+ }
+
+ public void onBecomeMasterFromSlave(Message message,
+ NotificationContext context) {
+ System.out.println("MasterSlaveStateModel.onBecomeMasterFromSlave() for "+ stateUnitKey);
+ sleep();
+
+ }
+
+ public void onBecomeOfflineFromSlave(Message message,
+ NotificationContext context) {
+ System.out.println("MasterSlaveStateModel.onBecomeOfflineFromSlave() for "+ stateUnitKey);
+ sleep();
+
+ }
+
+ public void onBecomeDroppedFromOffline(Message message,
+ NotificationContext context) {
+ System.out.println("ObBecomeDroppedFromOffline() for "+ stateUnitKey);
+ sleep();
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000..7b9bebe
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.examples;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class OnlineOfflineStateModelFactory extends
+ StateModelFactory<StateModel> {
+ int _delay;
+
+ public OnlineOfflineStateModelFactory(int delay) {
+ _delay = delay;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String stateUnitKey) {
+ OnlineOfflineStateModel stateModel = new OnlineOfflineStateModel();
+ stateModel.setDelay(_delay);
+ return stateModel;
+ }
+
+ public static class OnlineOfflineStateModel extends StateModel {
+ int _transDelay = 0;
+
+ public void setDelay(int delay) {
+ _transDelay = delay > 0 ? delay : 0;
+ }
+
+ public void onBecomeOnlineFromOffline(Message message,
+ NotificationContext context) {
+ System.out
+ .println("OnlineOfflineStateModel.onBecomeOnlineFromOffline()");
+ sleep();
+ }
+
+ public void onBecomeOfflineFromOnline(Message message,
+ NotificationContext context) {
+ System.out
+ .println("OnlineOfflineStateModel.onBecomeOfflineFromOnline()");
+ sleep();
+ }
+
+ public void onBecomeDroppedFromOffline(Message message,
+ NotificationContext context) {
+ System.out.println("OnlineOfflineStateModel.onBecomeDroppedFromOffline()");
+ sleep();
+
+ }
+
+ private void sleep() {
+ try {
+ Thread.sleep(_transDelay);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/package-info.java b/helix-core/src/main/java/org/apache/helix/examples/package-info.java
new file mode 100644
index 0000000..0016cca
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Examples of using Helix cluster manager
+ *
+ */
+package org.apache.helix.examples;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
new file mode 100644
index 0000000..3816914
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import org.apache.log4j.Logger;
+
+public class AccumulateAggregationType implements AggregationType
+{
+
+ private static final Logger logger = Logger
+ .getLogger(AccumulateAggregationType.class);
+
+ public final static String TYPE_NAME = "accumulate";
+
+ @Override
+ public String getName()
+ {
+ return TYPE_NAME;
+ }
+
+ @Override
+ public String merge(String iv, String ev, long prevTimestamp)
+ {
+ double inVal = Double.parseDouble(iv);
+ double existingVal = Double.parseDouble(ev);
+ return String.valueOf(inVal + existingVal);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
new file mode 100644
index 0000000..d331a0f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+public interface AggregationType
+{
+
+ // public abstract <T extends Object> T merge(T iv, T ev);
+
+ public final static String DELIM = "#";
+
+ public abstract String merge(String incomingVal, String existingVal,
+ long prevTimestamp);
+
+ public abstract String getName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationTypeFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationTypeFactory.java b/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationTypeFactory.java
new file mode 100644
index 0000000..3e43f25
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationTypeFactory.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.StringTokenizer;
+
+import org.apache.log4j.Logger;
+
+public class AggregationTypeFactory
+{
+ private static final Logger logger = Logger
+ .getLogger(AggregationTypeFactory.class);
+
+ public AggregationTypeFactory()
+ {
+ }
+
+ // TODO: modify this function so that it takes a single string, but can parse
+ // apart params from type
+ public static AggregationType getAggregationType(String input)
+ {
+ if (input == null)
+ {
+ logger.error("AggregationType name is null");
+ return null;
+ }
+ StringTokenizer tok = new StringTokenizer(input, AggregationType.DELIM);
+ String type = tok.nextToken();
+ int numParams = tok.countTokens();
+ String[] params = new String[numParams];
+ for (int i = 0; i < numParams; i++)
+ {
+ if (!tok.hasMoreTokens())
+ {
+ logger.error("Trying to parse non-existent params");
+ return null;
+ }
+ params[i] = tok.nextToken();
+ }
+
+ if (type.equals(AccumulateAggregationType.TYPE_NAME))
+ {
+ return new AccumulateAggregationType();
+ }
+ else if (type.equals(DecayAggregationType.TYPE_NAME))
+ {
+ if (params.length < 1)
+ {
+ logger
+ .error("DecayAggregationType must contain <decay weight> parameter");
+ return null;
+ }
+ return new DecayAggregationType(Double.parseDouble(params[0]));
+ }
+ else if (type.equals(WindowAggregationType.TYPE_NAME))
+ {
+ if (params.length < 1)
+ {
+ logger
+ .error("WindowAggregationType must contain <window size> parameter");
+ }
+ return new WindowAggregationType(Integer.parseInt(params[0]));
+ }
+ else
+ {
+ logger.error("Unknown AggregationType " + type);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/DecayAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/DecayAggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/DecayAggregationType.java
new file mode 100644
index 0000000..6cec7f8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/DecayAggregationType.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.TimerTask;
+
+import org.apache.log4j.Logger;
+
+public class DecayAggregationType implements AggregationType
+{
+
+ private static final Logger logger = Logger
+ .getLogger(DecayAggregationType.class);
+
+ public final static String TYPE_NAME = "decay";
+
+ double _decayFactor = 0.1;
+
+ public DecayAggregationType(double df)
+ {
+ super();
+ _decayFactor = df;
+ }
+
+ @Override
+ public String getName()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(TYPE_NAME);
+ sb.append(DELIM);
+ sb.append(_decayFactor);
+ return sb.toString();
+ }
+
+ @Override
+ public String merge(String iv, String ev, long prevTimestamp)
+ {
+ double incomingVal = Double.parseDouble(iv);
+ double existingVal = Double.parseDouble(ev);
+ long currTimestamp = System.currentTimeMillis();
+ double minutesOld = (currTimestamp - prevTimestamp) / 60000.0;
+ // come up with decay coeff for old value. More time passed, the more it
+ // decays
+ double oldDecayCoeff = Math.pow((1 - _decayFactor), minutesOld);
+ return String
+ .valueOf((double) (oldDecayCoeff * existingVal + (1 - oldDecayCoeff)
+ * incomingVal));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultHealthReportProvider.java b/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultHealthReportProvider.java
new file mode 100644
index 0000000..ae262eb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultHealthReportProvider.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+
+
+class DefaultHealthReportProvider extends HealthReportProvider
+{
+ private static final Logger _logger = Logger
+ .getLogger(DefaultHealthReportProvider.class);
+
+ public final static String _availableCPUs = "availableCPUs";
+ public final static String _freePhysicalMemory = "freePhysicalMemory";
+ public final static String _totalJvmMemory = "totalJvmMemory";
+ public final static String _freeJvmMemory = "freeJvmMemory";
+ public final static String _averageSystemLoad = "averageSystemLoad";
+
+ public DefaultHealthReportProvider()
+ {
+ }
+
+ @Override
+ public Map<String, String> getRecentHealthReport()
+ {
+ OperatingSystemMXBean osMxBean = ManagementFactory
+ .getOperatingSystemMXBean();
+ long freeJvmMemory = Runtime.getRuntime().freeMemory();
+ long totalJvmMemory = Runtime.getRuntime().totalMemory();
+ int availableCPUs = osMxBean.getAvailableProcessors();
+ double avgSystemLoad = osMxBean.getSystemLoadAverage();
+ long freePhysicalMemory = Long.MAX_VALUE;
+
+ try
+ {
+ // if( osMxBean instanceof com.sun.management.OperatingSystemMXBean)
+ // {
+ // com.sun.management.OperatingSystemMXBean sunOsMxBean
+ // = (com.sun.management.OperatingSystemMXBean) osMxBean;
+ // freePhysicalMemory = sunOsMxBean.getFreePhysicalMemorySize();
+ // }
+ }
+ catch (Throwable t)
+ {
+ _logger.error(t);
+ }
+
+ Map<String, String> result = new TreeMap<String, String>();
+
+ result.put(_availableCPUs, "" + availableCPUs);
+ result.put(_freePhysicalMemory, "" + freePhysicalMemory);
+ result.put(_freeJvmMemory, "" + freeJvmMemory);
+ result.put(_totalJvmMemory, "" + totalJvmMemory);
+ result.put(_averageSystemLoad, "" + avgSystemLoad);
+
+ return result;
+ }
+
+ @Override
+ public Map<String, Map<String, String>> getRecentPartitionHealthReport()
+ {
+ Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
+
+ result.put(getReportName(), getRecentHealthReport());
+ return result;
+ }
+
+ @Override
+ public void resetStats()
+ {
+ // TODO Auto-generated method stub
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultPerfCounters.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultPerfCounters.java b/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultPerfCounters.java
new file mode 100644
index 0000000..cbc9938
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/DefaultPerfCounters.java
@@ -0,0 +1,111 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.Date;
+
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+
+
+@Deprecated
+public class DefaultPerfCounters extends ZNRecord
+{
+ private static final Logger _logger = Logger
+ .getLogger(DefaultPerfCounters.class);
+
+ public final static String _availableCPUs = "availableCPUs";
+ public final static String _freePhysicalMemory = "freePhysicalMemory";
+ public final static String _totalJvmMemory = "totalJvmMemory";
+ public final static String _freeJvmMemory = "freeJvmMemory";
+ public final static String _averageSystemLoad = "averageSystemLoad";
+
+ public DefaultPerfCounters(String instanceName, long availableCPUs,
+ long freePhysicalMemory, long freeJvmMemory, long totalJvmMemory,
+ double averageSystemLoad)
+ {
+ super("DefaultPerfCounters");
+ setSimpleField("instanceName", instanceName);
+ setSimpleField("createTime", new Date().toString());
+
+ setSimpleField(_availableCPUs, "" + availableCPUs);
+ setSimpleField(_freePhysicalMemory, "" + freePhysicalMemory);
+ setSimpleField(_freeJvmMemory, "" + freeJvmMemory);
+ setSimpleField(_totalJvmMemory, "" + totalJvmMemory);
+ setSimpleField(_averageSystemLoad, "" + averageSystemLoad);
+ }
+
+ public long getAvailableCpus()
+ {
+ return getSimpleLongVal(_availableCPUs);
+ }
+
+ public double getAverageSystemLoad()
+ {
+ return getSimpleDoubleVal(_averageSystemLoad);
+ }
+
+ public long getTotalJvmMemory()
+ {
+ return getSimpleLongVal(_totalJvmMemory);
+ }
+
+ public long getFreeJvmMemory()
+ {
+ return getSimpleLongVal(_freeJvmMemory);
+ }
+
+ public long getFreePhysicalMemory()
+ {
+ return getSimpleLongVal(_freePhysicalMemory);
+ }
+
+ long getSimpleLongVal(String key)
+ {
+ String strVal = getSimpleField(key);
+ if (strVal == null)
+ {
+ return 0;
+ }
+ try
+ {
+ return Long.parseLong(strVal);
+ }
+ catch (Exception e)
+ {
+ _logger.warn(e);
+ return 0;
+ }
+ }
+
+ double getSimpleDoubleVal(String key)
+ {
+ String strVal = getSimpleField(key);
+ if (strVal == null)
+ {
+ return 0;
+ }
+ try
+ {
+ return Double.parseDouble(strVal);
+ }
+ catch (Exception e)
+ {
+ _logger.warn(e);
+ return 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/HealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthReportProvider.java b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthReportProvider.java
new file mode 100644
index 0000000..1725208
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthReportProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.Map;
+
+public abstract class HealthReportProvider
+{
+ public static final String _defaultPerfCounters = "defaultPerfCounters";
+
+ public abstract Map<String, String> getRecentHealthReport();
+
+ public Map<String, Map<String, String>> getRecentPartitionHealthReport()
+ {
+ return null;
+ }
+
+ public abstract void resetStats();
+
+ public String getReportName()
+ {
+ return _defaultPerfCounters;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
new file mode 100644
index 0000000..136ae63
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
@@ -0,0 +1,203 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Timer;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ReadHealthDataStage;
+import org.apache.helix.controller.stages.StatsAggregationStage;
+import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
+import org.apache.helix.monitoring.mbeans.HelixStageLatencyMonitor;
+import org.apache.log4j.Logger;
+
+
+public class HealthStatsAggregationTask extends HelixTimerTask
+{
+ private static final Logger LOG = Logger.getLogger(HealthStatsAggregationTask.class);
+
+ public final static int DEFAULT_HEALTH_CHECK_LATENCY = 30 * 1000;
+
+ private Timer _timer;
+ private final HelixManager _manager;
+ private final Pipeline _healthStatsAggregationPipeline;
+ private final int _delay;
+ private final int _period;
+ private final ClusterAlertMBeanCollection _alertItemCollection;
+ private final Map<String, HelixStageLatencyMonitor> _stageLatencyMonitorMap =
+ new HashMap<String, HelixStageLatencyMonitor>();
+
+ public HealthStatsAggregationTask(HelixManager manager, int delay, int period)
+ {
+ _manager = manager;
+ _delay = delay;
+ _period = period;
+
+ // health stats pipeline
+ _healthStatsAggregationPipeline = new Pipeline();
+ _healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
+ StatsAggregationStage statAggregationStage = new StatsAggregationStage();
+ _healthStatsAggregationPipeline.addStage(statAggregationStage);
+ _alertItemCollection = statAggregationStage.getClusterAlertMBeanCollection();
+
+ registerStageLatencyMonitor(_healthStatsAggregationPipeline);
+ }
+
+ public HealthStatsAggregationTask(HelixManager manager)
+ {
+ this(manager, DEFAULT_HEALTH_CHECK_LATENCY, DEFAULT_HEALTH_CHECK_LATENCY);
+ }
+
+ private void registerStageLatencyMonitor(Pipeline pipeline)
+ {
+ for (Stage stage : pipeline.getStages())
+ {
+ String stgName = stage.getStageName();
+ if (!_stageLatencyMonitorMap.containsKey(stgName))
+ {
+ try
+ {
+ _stageLatencyMonitorMap.put(stage.getStageName(),
+ new HelixStageLatencyMonitor(_manager.getClusterName(),
+ stgName));
+ }
+ catch (Exception e)
+ {
+ LOG.error("Couldn't create StageLatencyMonitor mbean for stage: " + stgName, e);
+ }
+ }
+ else
+ {
+ LOG.error("StageLatencyMonitor for stage: " + stgName
+ + " already exists. Skip register it");
+ }
+ }
+ }
+
+ @Override
+ public void start()
+ {
+ LOG.info("START HealthAggregationTask");
+
+ if (_timer == null)
+ {
+ // Remove all the previous health check values, if any
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ List<String> existingHealthRecordNames = accessor.getChildNames(accessor.keyBuilder().healthReports(_manager.getInstanceName()));
+ for(String healthReportName : existingHealthRecordNames)
+ {
+ LOG.info("Removing old healthrecord " + healthReportName);
+ accessor.removeProperty(accessor.keyBuilder().healthReport(_manager.getInstanceName(),healthReportName));
+ }
+
+ _timer = new Timer(true);
+ _timer.scheduleAtFixedRate(this, new Random().nextInt(_delay), _period);
+ }
+ else
+ {
+ LOG.warn("timer already started");
+ }
+ }
+
+ @Override
+ public synchronized void stop()
+ {
+ LOG.info("Stop HealthAggregationTask");
+
+ if (_timer != null)
+ {
+ _timer.cancel();
+ _timer = null;
+ _alertItemCollection.reset();
+
+ for (HelixStageLatencyMonitor stgLatencyMonitor : _stageLatencyMonitorMap.values())
+ {
+ stgLatencyMonitor.reset();
+ }
+ }
+ else
+ {
+ LOG.warn("timer already stopped");
+ }
+ }
+
+ @Override
+ public synchronized void run()
+ {
+ if (!isEnabled())
+ {
+ LOG.info("HealthAggregationTask is disabled.");
+ return;
+ }
+
+ if (!_manager.isLeader())
+ {
+ LOG.error("Cluster manager: " + _manager.getInstanceName()
+ + " is not leader. Pipeline will not be invoked");
+ return;
+ }
+
+ try
+ {
+ ClusterEvent event = new ClusterEvent("healthChange");
+ event.addAttribute("helixmanager", _manager);
+ event.addAttribute("HelixStageLatencyMonitorMap", _stageLatencyMonitorMap);
+
+ _healthStatsAggregationPipeline.handle(event);
+ _healthStatsAggregationPipeline.finish();
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception while executing pipeline: " + _healthStatsAggregationPipeline,
+ e);
+ }
+ }
+
+ private boolean isEnabled()
+ {
+ ConfigAccessor configAccessor = _manager.getConfigAccessor();
+ boolean enabled = true;
+ if (configAccessor != null)
+ {
+ // zk-based cluster manager
+ ConfigScope scope =
+ new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
+ String isEnabled = configAccessor.get(scope, "healthChange.enabled");
+ if (isEnabled != null)
+ {
+ enabled = new Boolean(isEnabled);
+ }
+ }
+ else
+ {
+ LOG.debug("File-based cluster manager doesn't support disable healthChange");
+ }
+ return enabled;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollector.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollector.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollector.java
new file mode 100644
index 0000000..55e05fc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollector.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import org.apache.helix.ZNRecord;
+
+public interface ParticipantHealthReportCollector
+{
+ public abstract void addHealthReportProvider(HealthReportProvider provider);
+
+ public abstract void removeHealthReportProvider(HealthReportProvider provider);
+
+ public abstract void reportHealthReportMessage(ZNRecord healthReport);
+
+ public abstract void transmitHealthReports();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
new file mode 100644
index 0000000..3174ce5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
@@ -0,0 +1,186 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.model.HealthStat;
+import org.apache.log4j.Logger;
+
+
+public class ParticipantHealthReportCollectorImpl implements
+ ParticipantHealthReportCollector
+{
+ private final LinkedList<HealthReportProvider> _healthReportProviderList = new LinkedList<HealthReportProvider>();
+ private Timer _timer;
+ private static final Logger _logger = Logger
+ .getLogger(ParticipantHealthReportCollectorImpl.class);
+ private final HelixManager _helixManager;
+ String _instanceName;
+ public final static int DEFAULT_REPORT_LATENCY = 60 * 1000;
+
+ public ParticipantHealthReportCollectorImpl(HelixManager helixManager,
+ String instanceName)
+ {
+ _helixManager = helixManager;
+ _instanceName = instanceName;
+ addDefaultHealthCheckInfoProvider();
+ }
+
+ private void addDefaultHealthCheckInfoProvider()
+ {
+ addHealthReportProvider(new DefaultHealthReportProvider());
+ }
+
+ public void start()
+ {
+ if (_timer == null)
+ {
+ _timer = new Timer(true);
+ _timer.scheduleAtFixedRate(new HealthCheckInfoReportingTask(),
+ new Random().nextInt(DEFAULT_REPORT_LATENCY), DEFAULT_REPORT_LATENCY);
+ }
+ else
+ {
+ _logger.warn("timer already started");
+ }
+ }
+
+ @Override
+ public void addHealthReportProvider(HealthReportProvider provider)
+ {
+ try
+ {
+ synchronized (_healthReportProviderList)
+ {
+ if (!_healthReportProviderList.contains(provider))
+ {
+ _healthReportProviderList.add(provider);
+ }
+ else
+ {
+ _logger.warn("Skipping a duplicated HealthCheckInfoProvider");
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error(e);
+ }
+ }
+
+ @Override
+ public void removeHealthReportProvider(HealthReportProvider provider)
+ {
+ synchronized (_healthReportProviderList)
+ {
+ if (_healthReportProviderList.contains(provider))
+ {
+ _healthReportProviderList.remove(provider);
+ }
+ else
+ {
+ _logger.warn("Skip removing a non-exist HealthCheckInfoProvider");
+ }
+ }
+ }
+
+ @Override
+ public void reportHealthReportMessage(ZNRecord healthCheckInfoUpdate)
+ {
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+// accessor.setProperty(
+// PropertyType.HEALTHREPORT, healthCheckInfoUpdate, _instanceName,
+// healthCheckInfoUpdate.getId());
+ accessor.setProperty(keyBuilder.healthReport(_instanceName, healthCheckInfoUpdate.getId()),
+ new HealthStat(healthCheckInfoUpdate));
+
+ }
+
+ public void stop()
+ {
+ _logger.info("Stop HealthCheckInfoReportingTask");
+ if (_timer != null)
+ {
+ _timer.cancel();
+ _timer = null;
+ }
+ else
+ {
+ _logger.warn("timer already stopped");
+ }
+ }
+
+ @Override
+ public synchronized void transmitHealthReports()
+ {
+ synchronized (_healthReportProviderList)
+ {
+ for (HealthReportProvider provider : _healthReportProviderList)
+ {
+ try
+ {
+ Map<String, String> report = provider.getRecentHealthReport();
+ Map<String, Map<String, String>> partitionReport = provider
+ .getRecentPartitionHealthReport();
+ ZNRecord record = new ZNRecord(provider.getReportName());
+ if (report != null)
+ {
+ record.setSimpleFields(report);
+ }
+ if (partitionReport != null)
+ {
+ record.setMapFields(partitionReport);
+ }
+ record.setSimpleField(StatsHolder.TIMESTAMP_NAME, "" + System.currentTimeMillis());
+
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.healthReport(_instanceName, record.getId()),
+ new HealthStat(record));
+
+// _helixManager.getDataAccessor().setProperty(
+// PropertyType.HEALTHREPORT, record, _instanceName, record.getId());
+ // reset stats (for now just the partition stats)
+ provider.resetStats();
+ }
+ catch (Exception e)
+ {
+ _logger.error("", e);
+ }
+ }
+ }
+ }
+
+ class HealthCheckInfoReportingTask extends TimerTask
+ {
+ @Override
+ public void run()
+ {
+ transmitHealthReports();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/PerformanceHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/PerformanceHealthReportProvider.java b/helix-core/src/main/java/org/apache/helix/healthcheck/PerformanceHealthReportProvider.java
new file mode 100644
index 0000000..64229a8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/PerformanceHealthReportProvider.java
@@ -0,0 +1,161 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.log4j.Logger;
+
+public class PerformanceHealthReportProvider extends HealthReportProvider
+{
+
+ private static final Logger _logger = Logger
+ .getLogger(PerformanceHealthReportProvider.class);
+
+ public final static String _testStat = "testStat";
+ public final static String _readLatencyStat = "readLatencyStat";
+ public final static String _requestCountStat = "requestCountStat";
+ public final static String _partitionRequestCountStat = "partitionRequestCountStat";
+
+ public static final String _performanceCounters = "performanceCounters";
+
+ public int readLatencyCount = 0;
+ public double readLatencySum = 0;
+
+ public int requestCount = 0;
+
+ // private final Map<String, String> _partitionCountsMap = new HashMap<String,
+ // String>();
+
+ private final Map<String, HashMap<String, String>> _partitionStatMaps = new HashMap<String, HashMap<String, String>>();
+
+ public PerformanceHealthReportProvider()
+ {
+ }
+
+ @Override
+ public Map<String, String> getRecentHealthReport()
+ {
+ long testStat = 10;
+
+ Map<String, String> result = new TreeMap<String, String>();
+
+ result.put(_testStat, "" + testStat);
+ result.put(_readLatencyStat, "" + readLatencySum
+ / (double) readLatencyCount);
+ result.put(_requestCountStat, "" + requestCount);
+
+ return result;
+ }
+
+ @Override
+ public Map<String, Map<String, String>> getRecentPartitionHealthReport()
+ {
+ Map<String, Map<String, String>> result = new TreeMap<String, Map<String, String>>();
+ for (String statName : _partitionStatMaps.keySet())
+ {
+ result.put(statName, _partitionStatMaps.get(statName));
+ }
+ return result;
+ }
+
+ HashMap<String, String> getStatMap(String statName, boolean createIfMissing)
+ {
+ // check if map for this stat exists. if not, create it
+ HashMap<String, String> statMap;
+ if (!_partitionStatMaps.containsKey(statName))
+ {
+ if (!createIfMissing)
+ {
+ return null;
+ }
+ statMap = new HashMap<String, String>();
+ _partitionStatMaps.put(statName, statMap);
+ }
+ else
+ {
+ statMap = _partitionStatMaps.get(statName);
+ }
+ return statMap;
+ }
+
+ // TODO:
+ // Currently participant is source of truth and updates ZK. We want ZK to be
+ // source of truth.
+ // Revise this approach the participant sends deltas of stats to controller
+ // (ZK?) and have controller do aggregation
+ // and update ZK. Make sure to wipe the participant between uploads.
+ String getPartitionStat(HashMap<String, String> partitionMap,
+ String partitionName)
+ {
+ return partitionMap.get(partitionName);
+ }
+
+ void setPartitionStat(HashMap<String, String> partitionMap,
+ String partitionName, String value)
+ {
+ partitionMap.put(partitionName, value);
+ }
+
+ public void incrementPartitionStat(String statName, String partitionName)
+ {
+ HashMap<String, String> statMap = getStatMap(statName, true);
+ String currValStr = getPartitionStat(statMap, partitionName);
+ double currVal;
+ if (currValStr == null)
+ {
+ currVal = 1.0;
+ }
+ else
+ {
+ currVal = Double.parseDouble(getPartitionStat(statMap, partitionName));
+ currVal++;
+ }
+ setPartitionStat(statMap, partitionName, String.valueOf(currVal));
+ }
+
+ public void submitPartitionStat(String statName, String partitionName,
+ String value)
+ {
+ HashMap<String, String> statMap = getStatMap(statName, true);
+ setPartitionStat(statMap, partitionName, value);
+ }
+
+ public String getPartitionStat(String statName, String partitionName)
+ {
+ HashMap<String, String> statMap = getStatMap(statName, false);
+ if (statMap == null)
+ {
+ return null;
+ }
+ else
+ {
+ return statMap.get(partitionName);
+ }
+ }
+
+ public void resetStats()
+ {
+ _partitionStatMaps.clear();
+ }
+
+ public String getReportName()
+ {
+ return _performanceCounters;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/Stat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/Stat.java b/helix-core/src/main/java/org/apache/helix/healthcheck/Stat.java
new file mode 100644
index 0000000..9629fe9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/Stat.java
@@ -0,0 +1,146 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class Stat
+{
+
+ private static final Logger _logger = Logger.getLogger(Stat.class);
+
+ public final static String OP_TYPE = "HTTP_OP";
+ public final static String MEASUREMENT_TYPE = "MEASUREMENT";
+ public final static String RESOURCE_NAME = "RESOURCE_NAME";
+ public final static String PARTITION_NAME = "PARTITION_NAME";
+ public final static String NODE_NAME = "NODE_NAME";
+ public final static String TIMESTAMP = "TIMESTAMP";
+ public final static String RETURN_STATUS = "RETURN_STATUS";
+ public final static String METRIC_NAME = "METRIC_NAME";
+ public final static String AGG_TYPE = "AGG_TYPE";
+
+ public String _opType;
+ public String _measurementType;
+ public String _resourceName;
+ public String _partitionName;
+ public String _nodeName;
+ public String _returnStatus;
+ public String _metricName;
+ public String _aggTypeName;
+ public String _timestamp;
+
+ public Stat(String opType, String measurementType, String resourceName,
+ String partitionName, String nodeName)
+ {
+ // this(opType, measurementType, resourceName, partitionName, nodeName,
+ // null, null, null);
+ this(opType, measurementType, resourceName, partitionName, nodeName, null,
+ null, null);
+ }
+
+ public Stat(String opType, String measurementType, String resourceName,
+ String partitionName, String nodeName, String returnStatus,
+ String metricName, AggregationType aggType)
+ {
+ this._opType = opType;
+ this._measurementType = measurementType;
+ this._resourceName = resourceName;
+ this._partitionName = partitionName;
+ this._nodeName = nodeName;
+ this._returnStatus = returnStatus;
+ this._metricName = metricName;
+ this._aggTypeName = null;
+ if (aggType != null)
+ {
+ this._aggTypeName = aggType.getName();
+ }
+
+ _timestamp = String.valueOf(System.currentTimeMillis());
+ }
+
+ public Stat(Map<String, String> in)
+ {
+ _opType = in.get(OP_TYPE);
+ _measurementType = in.get(MEASUREMENT_TYPE);
+ _resourceName = in.get(RESOURCE_NAME);
+ _partitionName = in.get(PARTITION_NAME);
+ _nodeName = in.get(NODE_NAME);
+ _timestamp = String.valueOf(System.currentTimeMillis());
+ }
+
+ public void setAggType(AggregationType aggType)
+ {
+ this._aggTypeName = aggType.getName();
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (!(obj instanceof Stat))
+ {
+ return false;
+ }
+ Stat other = (Stat) obj;
+ if (!_partitionName.equals(other._partitionName))
+ {
+ return false;
+ }
+ if (!_opType.equals(other._opType))
+ {
+ return false;
+ }
+ if (!_measurementType.equals(other._measurementType))
+ {
+ return false;
+ }
+ if (!_resourceName.equals(other._resourceName))
+ {
+ return false;
+ }
+ if (!_nodeName.equals(other._nodeName))
+ {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return (_partitionName + _opType + _measurementType + _resourceName + _nodeName)
+ .hashCode();
+ }
+
+ public void addAlert(long value)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public String toString()
+ {
+ return _nodeName + "." + _resourceName + "." + _partitionName + "."
+ + _opType + "." + _measurementType + "." + _returnStatus + "."
+ + _metricName + "." + _aggTypeName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/StatHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/StatHealthReportProvider.java b/helix-core/src/main/java/org/apache/helix/healthcheck/StatHealthReportProvider.java
new file mode 100644
index 0000000..e4ee622
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/StatHealthReportProvider.java
@@ -0,0 +1,175 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+public class StatHealthReportProvider extends HealthReportProvider
+{
+
+ private static final Logger _logger = Logger
+ .getLogger(StatHealthReportProvider.class);
+
+ /*
+ * public final static String _testStat = "testStat"; public final static
+ * String _readLatencyStat = "readLatencyStat"; public final static String
+ * _requestCountStat = "requestCountStat"; public final static String
+ * _partitionRequestCountStat = "partitionRequestCountStat";
+ */
+
+ public static final String REPORT_NAME = "ParticipantStats";
+ public String _reportName = REPORT_NAME;
+
+ public static final String STAT_VALUE = "value";
+ public static final String TIMESTAMP = "timestamp";
+
+ public int readLatencyCount = 0;
+ public double readLatencySum = 0;
+
+ public int requestCount = 0;
+
+ // private final Map<String, String> _partitionCountsMap = new HashMap<String,
+ // String>();
+
+ // private final Map<String, HashMap<String,String>> _partitionStatMaps = new
+ // HashMap<String, HashMap<String,String>>();
+ private final ConcurrentHashMap<String, String> _statsToValues = new ConcurrentHashMap<String, String>();
+ private final ConcurrentHashMap<String, String> _statsToTimestamps = new ConcurrentHashMap<String, String>();
+
+ public StatHealthReportProvider()
+ {
+ }
+
+ @Override
+ public Map<String, String> getRecentHealthReport()
+ {
+ return null;
+ }
+
+ // TODO: function is misnamed, but return type is what I want
+ @Override
+ public Map<String, Map<String, String>> getRecentPartitionHealthReport()
+ {
+ Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
+ for (String stat : _statsToValues.keySet())
+ {
+ Map<String, String> currStat = new HashMap<String, String>();
+ /*
+ * currStat.put(Stat.OP_TYPE, stat._opType);
+ * currStat.put(Stat.MEASUREMENT_TYPE, stat._measurementType);
+ * currStat.put(Stat.NODE_NAME, stat._nodeName);
+ * currStat.put(Stat.PARTITION_NAME, stat._partitionName);
+ * currStat.put(Stat.RESOURCE_NAME, stat._resourceName);
+ * currStat.put(Stat.RETURN_STATUS, stat._returnStatus);
+ * currStat.put(Stat.METRIC_NAME, stat._metricName);
+ * currStat.put(Stat.AGG_TYPE, stat._aggTypeName);
+ */
+ currStat.put(TIMESTAMP, _statsToTimestamps.get(stat));
+ currStat.put(STAT_VALUE, _statsToValues.get(stat));
+ result.put(stat, currStat);
+ }
+ return result;
+ }
+
+ public boolean contains(Stat inStat)
+ {
+ return _statsToValues.containsKey(inStat);
+ }
+
+ public Set<String> keySet()
+ {
+ return _statsToValues.keySet();
+ }
+
+ public String getStatValue(Stat inStat)
+ {
+ return _statsToValues.get(inStat);
+ }
+
+ public long getStatTimestamp(Stat inStat)
+ {
+ return Long.parseLong(_statsToTimestamps.get(inStat));
+ }
+
+ /*
+ * public String getStatValue(String opType, String measurementType, String
+ * resourceName, String partitionName, String nodeName, boolean
+ * createIfMissing) { Stat rs = new Stat(opType, measurementType,
+ * resourceName, partitionName, nodeName); String val =
+ * _statsToValues.get(rs); if (val == null && createIfMissing) { val = "0";
+ * _statsToValues.put(rs, val); } return val; }
+ */
+
+ public void writeStat(String statName, String val, String timestamp)
+ {
+ _statsToValues.put(statName, val);
+ _statsToTimestamps.put(statName, timestamp);
+ }
+
+ /*
+ * public void setStat(Stat es, String val, String timestamp) { writeStat(es,
+ * val, timestamp); }
+ *
+ * public void setStat(String opType, String measurementType, String
+ * resourceName, String partitionName, String nodeName, double val, String
+ * timestamp) { Stat rs = new Stat(opType, measurementType, resourceName,
+ * partitionName, nodeName); writeStat(rs, String.valueOf(val), timestamp); }
+ */
+
+ public void incrementStat(String statName, String timestamp)
+ {
+ // Stat rs = new Stat(opType, measurementType, resourceName, partitionName,
+ // nodeName);
+ String val = _statsToValues.get(statName);
+ if (val == null)
+ {
+ val = "0";
+ }
+ else
+ {
+ val = String.valueOf(Double.parseDouble(val) + 1);
+ }
+ writeStat(statName, val, timestamp);
+ }
+
+ public int size()
+ {
+ return _statsToValues.size();
+ }
+
+ public void resetStats()
+ {
+ _statsToValues.clear();
+ _statsToTimestamps.clear();
+ }
+
+ public void setReportName(String name)
+ {
+ _reportName = name;
+ }
+
+ public String getReportName()
+ {
+ return _reportName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/WindowAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/WindowAggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/WindowAggregationType.java
new file mode 100644
index 0000000..3db4382
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/WindowAggregationType.java
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.healthcheck;
+
+import java.util.TimerTask;
+
+import org.apache.log4j.Logger;
+
+public class WindowAggregationType implements AggregationType
+{
+
+ private static final Logger logger = Logger
+ .getLogger(WindowAggregationType.class);
+
+ public final String WINDOW_DELIM = "#";
+
+ public final static String TYPE_NAME = "window";
+
+ int _windowSize = 1;
+
+ public WindowAggregationType(int ws)
+ {
+ super();
+ _windowSize = ws;
+ }
+
+ @Override
+ public String getName()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(TYPE_NAME);
+ sb.append(DELIM);
+ sb.append(_windowSize);
+ return sb.toString();
+ }
+
+ @Override
+ public String merge(String incomingVal, String existingVal, long prevTimestamp)
+ {
+ String[] windowVals;
+ if (existingVal == null)
+ {
+ return incomingVal;
+ }
+ else
+ {
+ windowVals = existingVal.split(WINDOW_DELIM);
+ int currLength = windowVals.length;
+ // window not full
+ if (currLength < _windowSize)
+ {
+ return existingVal + WINDOW_DELIM + incomingVal;
+ }
+ // evict oldest
+ else
+ {
+ int firstDelim = existingVal.indexOf(WINDOW_DELIM);
+ return existingVal.substring(firstDelim + 1) + WINDOW_DELIM
+ + incomingVal;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/healthcheck/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/package-info.java b/helix-core/src/main/java/org/apache/helix/healthcheck/package-info.java
new file mode 100644
index 0000000..3aa4e53
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix health check classes
+ *
+ */
+package org.apache.helix.healthcheck;
\ No newline at end of file