You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:40 UTC
[12/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/examples/BootstrapProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/examples/BootstrapProcess.java b/helix-core/src/main/java/com/linkedin/helix/examples/BootstrapProcess.java
deleted file mode 100644
index f9a9530..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/examples/BootstrapProcess.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.examples;
-
-import java.io.File;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Date;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.OptionGroup;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.messaging.AsyncCallback;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.participant.StateMachineEngine;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-/**
- * This process does little more than handling the state transition messages.
- * This is generally the case when the server needs to bootstrap when it comes
- * up.<br>
- * Flow for a typical Master-slave state model<br>
- * <ul>
- * <li>Gets OFFLINE-SLAVE transition</li>
- * <li>Figure out if it has any data and how old it is for the SLAVE partition</li>
- * <li>If the data is fresh enough it can probably catch up from the replication
- * stream of the master</li>
- * <li>If not, then it can use the messaging service provided by cluster manager
- * to talk other nodes to figure out if they have any backup</li>
- * </li>
- * <li>Once it gets a response from other nodes in the cluster the process can
- * decide which back up it wants to use to bootstrap</li>
- * </ul>
- *
- * @author kgopalak
- *
- */
-public class BootstrapProcess
-{
- static final String REQUEST_BOOTSTRAP_URL = "REQUEST_BOOTSTRAP_URL";
- public static final String zkServer = "zkSvr";
- public static final String cluster = "cluster";
- public static final String hostAddress = "host";
- public static final String hostPort = "port";
- public static final String relayCluster = "relayCluster";
- public static final String help = "help";
- public static final String configFile = "configFile";
- public static final String stateModel = "stateModelType";
- public static final String transDelay = "transDelay";
-
- private final String zkConnectString;
- private final String clusterName;
- private final String instanceName;
- private final String stateModelType;
- private HelixManager manager;
-
-// private StateMachineEngine genericStateMachineHandler;
-
- private String _file = null;
- private StateModelFactory<StateModel> stateModelFactory;
- private final int delay;
-
- public BootstrapProcess(String zkConnectString, String clusterName,
- String instanceName, String file, String stateModel, int delay)
- {
- this.zkConnectString = zkConnectString;
- this.clusterName = clusterName;
- this.instanceName = instanceName;
- this._file = file;
- stateModelType = stateModel;
- this.delay = delay;
- }
-
- public void start() throws Exception
- {
- if (_file == null)
- {
- manager = HelixManagerFactory.getZKHelixManager(clusterName,
- instanceName,
- InstanceType.PARTICIPANT,
- zkConnectString);
-
- }
- else
- {
- manager = HelixManagerFactory.getStaticFileHelixManager(clusterName,
- instanceName,
- InstanceType.PARTICIPANT,
- _file);
-
- }
- stateModelFactory = new BootstrapHandler();
-// genericStateMachineHandler = new StateMachineEngine();
-// genericStateMachineHandler.registerStateModelFactory("MasterSlave", stateModelFactory);
-
- StateMachineEngine stateMach = manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
-
- manager.getMessagingService().registerMessageHandlerFactory(
- MessageType.STATE_TRANSITION.toString(), stateMach);
- manager.getMessagingService().registerMessageHandlerFactory(
- MessageType.USER_DEFINE_MSG.toString(),
- new CustomMessageHandlerFactory());
- manager.connect();
- if (_file != null)
- {
- ClusterStateVerifier.verifyFileBasedClusterStates(_file, instanceName,
- stateModelFactory);
-
- }
- }
-
- public static class CustomMessageHandlerFactory implements
- MessageHandlerFactory
- {
-
- @Override
- public MessageHandler createHandler(Message message,
- NotificationContext context)
- {
-
- return new CustomMessageHandler(message, context);
- }
-
- @Override
- public String getMessageType()
- {
- return MessageType.USER_DEFINE_MSG.toString();
- }
-
- @Override
- public void reset()
- {
-
- }
-
- static class CustomMessageHandler extends MessageHandler
- {
-
- public CustomMessageHandler(Message message, NotificationContext context)
- {
- super(message, context);
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public HelixTaskResult handleMessage() throws InterruptedException
- {
- String hostName;
- HelixTaskResult result = new HelixTaskResult();
- try
- {
- hostName = InetAddress.getLocalHost().getCanonicalHostName();
- } catch (UnknownHostException e)
- {
- hostName = "UNKNOWN";
- }
- String port = "2134";
- String msgSubType = _message.getMsgSubType();
- if (msgSubType.equals(REQUEST_BOOTSTRAP_URL))
- {
- result.getTaskResultMap().put(
- "BOOTSTRAP_URL",
- "http://" + hostName + ":" + port
- + "/getFile?path=/data/bootstrap/"
- + _message.getResourceName() + "/"
- + _message.getPartitionName() + ".tar");
-
- result.getTaskResultMap().put(
- "BOOTSTRAP_TIME",
- ""+new Date().getTime());
- }
-
- result.setSuccess(true);
- return result;
- }
-
- @Override
- public void onError( Exception e, ErrorCode code, ErrorType type)
- {
- e.printStackTrace();
- }
- }
- }
-
-
- @SuppressWarnings("static-access")
- private static Options constructCommandLineOptions()
- {
- Option helpOption = OptionBuilder.withLongOpt(help)
- .withDescription("Prints command-line options info").create();
-
- Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
- .withDescription("Provide zookeeper address").create();
- zkServerOption.setArgs(1);
- zkServerOption.setRequired(true);
- zkServerOption.setArgName("ZookeeperServerAddress(Required)");
-
- Option clusterOption = OptionBuilder.withLongOpt(cluster)
- .withDescription("Provide cluster name").create();
- clusterOption.setArgs(1);
- clusterOption.setRequired(true);
- clusterOption.setArgName("Cluster name (Required)");
-
- Option hostOption = OptionBuilder.withLongOpt(hostAddress)
- .withDescription("Provide host name").create();
- hostOption.setArgs(1);
- hostOption.setRequired(true);
- hostOption.setArgName("Host name (Required)");
-
- Option portOption = OptionBuilder.withLongOpt(hostPort)
- .withDescription("Provide host port").create();
- portOption.setArgs(1);
- portOption.setRequired(true);
- portOption.setArgName("Host port (Required)");
-
- Option stateModelOption = OptionBuilder.withLongOpt(stateModel)
- .withDescription("StateModel Type").create();
- stateModelOption.setArgs(1);
- stateModelOption.setRequired(true);
- stateModelOption.setArgName("StateModel Type (Required)");
-
- // add an option group including either --zkSvr or --configFile
- Option fileOption = OptionBuilder.withLongOpt(configFile)
- .withDescription("Provide file to read states/messages").create();
- fileOption.setArgs(1);
- fileOption.setRequired(true);
- fileOption.setArgName("File to read states/messages (Optional)");
-
- Option transDelayOption = OptionBuilder.withLongOpt(transDelay)
- .withDescription("Provide state trans delay").create();
- transDelayOption.setArgs(1);
- transDelayOption.setRequired(false);
- transDelayOption.setArgName("Delay time in state transition, in MS");
-
- OptionGroup optionGroup = new OptionGroup();
- optionGroup.addOption(zkServerOption);
- optionGroup.addOption(fileOption);
-
- Options options = new Options();
- options.addOption(helpOption);
- // options.addOption(zkServerOption);
- options.addOption(clusterOption);
- options.addOption(hostOption);
- options.addOption(portOption);
- options.addOption(stateModelOption);
- options.addOption(transDelayOption);
-
- options.addOptionGroup(optionGroup);
-
- return options;
- }
-
- public static void printUsage(Options cliOptions)
- {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.printHelp("java " + BootstrapProcess.class.getName(), cliOptions);
- }
-
- public static CommandLine processCommandLineArgs(String[] cliArgs)
- throws Exception
- {
- CommandLineParser cliParser = new GnuParser();
- Options cliOptions = constructCommandLineOptions();
- try
- {
- return cliParser.parse(cliOptions, cliArgs);
- } catch (ParseException pe)
- {
- System.err
- .println("CommandLineClient: failed to parse command-line options: "
- + pe.toString());
- printUsage(cliOptions);
- System.exit(1);
- }
- return null;
- }
-
- public static void main(String[] args) throws Exception
- {
- String zkConnectString = "localhost:2181";
- String clusterName = "storage-integration-cluster";
- String instanceName = "localhost_8905";
- String file = null;
- String stateModelValue = "MasterSlave";
- int delay = 0;
- boolean skipZeroArgs = true;// false is for dev testing
- if (!skipZeroArgs || args.length > 0)
- {
- CommandLine cmd = processCommandLineArgs(args);
- zkConnectString = cmd.getOptionValue(zkServer);
- clusterName = cmd.getOptionValue(cluster);
-
- String host = cmd.getOptionValue(hostAddress);
- String portString = cmd.getOptionValue(hostPort);
- int port = Integer.parseInt(portString);
- instanceName = host + "_" + port;
-
- file = cmd.getOptionValue(configFile);
- if (file != null)
- {
- File f = new File(file);
- if (!f.exists())
- {
- System.err.println("static config file doesn't exist");
- System.exit(1);
- }
- }
-
- stateModelValue = cmd.getOptionValue(stateModel);
- if (cmd.hasOption(transDelay))
- {
- try
- {
- delay = Integer.parseInt(cmd.getOptionValue(transDelay));
- if (delay < 0)
- {
- throw new Exception("delay must be positive");
- }
- } catch (Exception e)
- {
- e.printStackTrace();
- delay = 0;
- }
- }
- }
- // Espresso_driver.py will consume this
- System.out.println("Starting Process with ZK:" + zkConnectString);
-
- BootstrapProcess process = new BootstrapProcess(zkConnectString,
- clusterName, instanceName, file, stateModelValue, delay);
-
- process.start();
- Thread.currentThread().join();
- }
-}
-
-class BootstrapReplyHandler extends AsyncCallback
-{
-
- public BootstrapReplyHandler()
- {
- }
-
- private String bootstrapUrl;
- private String bootstrapTime;
-
- @Override
- public void onTimeOut()
- {
- System.out.println("Timed out");
- }
-
- public String getBootstrapUrl()
- {
- return bootstrapUrl;
- }
-
- public String getBootstrapTime()
- {
- return bootstrapTime;
- }
-
- @Override
- public void onReplyMessage(Message message)
- {
- String time = message.getResultMap().get("BOOTSTRAP_TIME");
- if (bootstrapTime == null || time.compareTo(bootstrapTime) > -1)
- {
- bootstrapTime = message.getResultMap().get("BOOTSTRAP_TIME");
- bootstrapUrl = message.getResultMap().get("BOOTSTRAP_URL");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/examples/DummyParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/examples/DummyParticipant.java b/helix-core/src/main/java/com/linkedin/helix/examples/DummyParticipant.java
deleted file mode 100644
index 7dfa38a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/examples/DummyParticipant.java
+++ /dev/null
@@ -1,123 +0,0 @@
-package com.linkedin.helix.examples;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.participant.StateMachineEngine;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-import com.linkedin.helix.participant.statemachine.StateModelInfo;
-import com.linkedin.helix.participant.statemachine.Transition;
-
-public class DummyParticipant
-{
- // dummy master-slave state model
- @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" })
- public static class DummyMSStateModel extends StateModel
- {
- @Transition(to = "SLAVE", from = "OFFLINE")
- public void onBecomeSlaveFromOffline(Message message, NotificationContext context)
- {
- String partitionName = message.getPartitionName();
- String instanceName = message.getTgtName();
- System.out.println(instanceName + " becomes SLAVE from OFFLINE for " + partitionName);
- }
-
- @Transition(to = "MASTER", from = "SLAVE")
- public void onBecomeMasterFromSlave(Message message, NotificationContext context)
- {
- String partitionName = message.getPartitionName();
- String instanceName = message.getTgtName();
- System.out.println(instanceName + " becomes MASTER from SLAVE for " + partitionName);
- }
-
- @Transition(to = "SLAVE", from = "MASTER")
- public void onBecomeSlaveFromMaster(Message message, NotificationContext context)
- {
- String partitionName = message.getPartitionName();
- String instanceName = message.getTgtName();
- System.out.println(instanceName + " becomes SLAVE from MASTER for " + partitionName);
- }
-
- @Transition(to = "OFFLINE", from = "SLAVE")
- public void onBecomeOfflineFromSlave(Message message, NotificationContext context)
- {
- String partitionName = message.getPartitionName();
- String instanceName = message.getTgtName();
- System.out.println(instanceName + " becomes OFFLINE from SLAVE for " + partitionName);
- }
-
- @Transition(to = "DROPPED", from = "OFFLINE")
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
- {
- String partitionName = message.getPartitionName();
- String instanceName = message.getTgtName();
- System.out.println(instanceName + " becomes DROPPED from OFFLINE for " + partitionName);
- }
-
- @Transition(to = "OFFLINE", from = "ERROR")
- public void onBecomeOfflineFromError(Message message, NotificationContext context)
- {
- String partitionName = message.getPartitionName();
- String instanceName = message.getTgtName();
- System.out.println(instanceName + " becomes OFFLINE from ERROR for " + partitionName);
- }
-
- @Override
- public void reset()
- {
- System.out.println("Default MockMSStateModel.reset() invoked");
- }
- }
-
- // dummy master slave state model factory
- public static class DummyMSModelFactory extends StateModelFactory<DummyMSStateModel>
- {
- @Override
- public DummyMSStateModel createNewStateModel(String partitionName)
- {
- DummyMSStateModel model = new DummyMSStateModel();
- return model;
- }
- }
-
- public static void main(String[] args)
- {
- if (args.length < 3)
- {
- System.err.println("USAGE: DummyParticipant zkAddress clusterName instanceName");
- System.exit(1);
- }
-
- String zkAddr = args[0];
- String clusterName = args[1];
- String instanceName = args[2];
-
- HelixManager manager = null;
- try
- {
- manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
- InstanceType.PARTICIPANT, zkAddr);
-
- StateMachineEngine stateMach = manager.getStateMachineEngine();
- DummyMSModelFactory msModelFactory = new DummyMSModelFactory();
- stateMach.registerStateModelFactory("MasterSlave", msModelFactory);
-
- manager.connect();
-
- Thread.currentThread().join();
- } catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } finally
- {
- if (manager != null)
- {
- manager.disconnect();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/examples/ExampleHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/examples/ExampleHelper.java b/helix-core/src/main/java/com/linkedin/helix/examples/ExampleHelper.java
deleted file mode 100644
index e16f2b2..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/examples/ExampleHelper.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package com.linkedin.helix.examples;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.ZkServer;
-import org.apache.commons.io.FileUtils;
-
-public class ExampleHelper
-{
-
-
- public static ZkServer startZkServer(String zkAddr)
- {
- System.out.println("Start zookeeper at " + zkAddr + " in thread "
- + Thread.currentThread().getName());
-
- String zkDir = zkAddr.replace(':', '_');
- final String logDir = "/tmp/" + zkDir + "/logs";
- final String dataDir = "/tmp/" + zkDir + "/dataDir";
- try
- {
- FileUtils.deleteDirectory(new File(dataDir));
- FileUtils.deleteDirectory(new File(logDir));
- } catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
- @Override
- public void createDefaultNameSpace(org.I0Itec.zkclient.ZkClient zkClient)
- {
- // do nothing
- }
- };
-
- int port = Integer.parseInt(zkAddr.substring(zkAddr.lastIndexOf(':') + 1));
- ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
- zkServer.start();
-
- return zkServer;
- }
-
- public static void stopZkServer(ZkServer zkServer)
- {
- if (zkServer != null)
- {
- zkServer.shutdown();
- System.out.println("Shut down zookeeper at port " + zkServer.getPort()
- + " in thread " + Thread.currentThread().getName());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/examples/ExampleProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/examples/ExampleProcess.java b/helix-core/src/main/java/com/linkedin/helix/examples/ExampleProcess.java
deleted file mode 100644
index 58e22f6..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/examples/ExampleProcess.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.examples;
-
-import java.io.File;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.OptionGroup;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.participant.StateMachineEngine;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class ExampleProcess
-{
-
- public static final String zkServer = "zkSvr";
- public static final String cluster = "cluster";
- public static final String hostAddress = "host";
- public static final String hostPort = "port";
- public static final String relayCluster = "relayCluster";
- public static final String help = "help";
- public static final String configFile = "configFile";
- public static final String stateModel = "stateModelType";
- public static final String transDelay = "transDelay";
-
- private final String zkConnectString;
- private final String clusterName;
- private final String instanceName;
- private final String stateModelType;
- private HelixManager manager;
-
-// private StateMachineEngine genericStateMachineHandler;
-
- private String _file = null;
- private StateModelFactory<StateModel> stateModelFactory;
- private final int delay;
-
- public ExampleProcess(String zkConnectString, String clusterName,
- String instanceName, String file, String stateModel, int delay)
- {
- this.zkConnectString = zkConnectString;
- this.clusterName = clusterName;
- this.instanceName = instanceName;
- this._file = file;
- stateModelType = stateModel;
- this.delay = delay;
- }
-
- public void start() throws Exception
- {
- if (_file == null)
- {
- manager = HelixManagerFactory.getZKHelixManager(clusterName,
- instanceName,
- InstanceType.PARTICIPANT,
- zkConnectString);
-
- }
- else
- {
- manager = HelixManagerFactory.getStaticFileHelixManager(clusterName,
- instanceName,
- InstanceType.PARTICIPANT,
- _file);
-
- }
-
- if ("MasterSlave".equalsIgnoreCase(stateModelType))
- {
- stateModelFactory = new MasterSlaveStateModelFactory(delay);
- } else if ("OnlineOffline".equalsIgnoreCase(stateModelType))
- {
- stateModelFactory = new OnlineOfflineStateModelFactory(delay);
- } else if ("LeaderStandby".equalsIgnoreCase(stateModelType))
- {
- stateModelFactory = new LeaderStandbyStateModelFactory(delay);
- }
-// genericStateMachineHandler = new StateMachineEngine();
-// genericStateMachineHandler.registerStateModelFactory(stateModelType, stateModelFactory);
-
- StateMachineEngine stateMach = manager.getStateMachineEngine();
- stateMach.registerStateModelFactory(stateModelType, stateModelFactory);
- manager.connect();
- manager.getMessagingService().registerMessageHandlerFactory(
- MessageType.STATE_TRANSITION.toString(), stateMach);
- if (_file != null)
- {
- ClusterStateVerifier.verifyFileBasedClusterStates(_file, instanceName,
- stateModelFactory);
-
- }
- }
-
- @SuppressWarnings("static-access")
- private static Options constructCommandLineOptions()
- {
- Option helpOption = OptionBuilder.withLongOpt(help)
- .withDescription("Prints command-line options info").create();
-
- Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
- .withDescription("Provide zookeeper address").create();
- zkServerOption.setArgs(1);
- zkServerOption.setRequired(true);
- zkServerOption.setArgName("ZookeeperServerAddress(Required)");
-
- Option clusterOption = OptionBuilder.withLongOpt(cluster)
- .withDescription("Provide cluster name").create();
- clusterOption.setArgs(1);
- clusterOption.setRequired(true);
- clusterOption.setArgName("Cluster name (Required)");
-
- Option hostOption = OptionBuilder.withLongOpt(hostAddress)
- .withDescription("Provide host name").create();
- hostOption.setArgs(1);
- hostOption.setRequired(true);
- hostOption.setArgName("Host name (Required)");
-
- Option portOption = OptionBuilder.withLongOpt(hostPort)
- .withDescription("Provide host port").create();
- portOption.setArgs(1);
- portOption.setRequired(true);
- portOption.setArgName("Host port (Required)");
-
- Option stateModelOption = OptionBuilder.withLongOpt(stateModel)
- .withDescription("StateModel Type").create();
- stateModelOption.setArgs(1);
- stateModelOption.setRequired(true);
- stateModelOption.setArgName("StateModel Type (Required)");
-
- // add an option group including either --zkSvr or --configFile
- Option fileOption = OptionBuilder.withLongOpt(configFile)
- .withDescription("Provide file to read states/messages").create();
- fileOption.setArgs(1);
- fileOption.setRequired(true);
- fileOption.setArgName("File to read states/messages (Optional)");
-
- Option transDelayOption = OptionBuilder.withLongOpt(transDelay)
- .withDescription("Provide state trans delay").create();
- transDelayOption.setArgs(1);
- transDelayOption.setRequired(false);
- transDelayOption.setArgName("Delay time in state transition, in MS");
-
- OptionGroup optionGroup = new OptionGroup();
- optionGroup.addOption(zkServerOption);
- optionGroup.addOption(fileOption);
-
- Options options = new Options();
- options.addOption(helpOption);
- // options.addOption(zkServerOption);
- options.addOption(clusterOption);
- options.addOption(hostOption);
- options.addOption(portOption);
- options.addOption(stateModelOption);
- options.addOption(transDelayOption);
-
- options.addOptionGroup(optionGroup);
-
- return options;
- }
-
- public static void printUsage(Options cliOptions)
- {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.printHelp("java " + ExampleProcess.class.getName(), cliOptions);
- }
-
- public static CommandLine processCommandLineArgs(String[] cliArgs)
- throws Exception
- {
- CommandLineParser cliParser = new GnuParser();
- Options cliOptions = constructCommandLineOptions();
- try
- {
- return cliParser.parse(cliOptions, cliArgs);
- } catch (ParseException pe)
- {
- System.err
- .println("CommandLineClient: failed to parse command-line options: "
- + pe.toString());
- printUsage(cliOptions);
- System.exit(1);
- }
- return null;
- }
-
- public static void main(String[] args) throws Exception
- {
- String zkConnectString = "localhost:2181";
- String clusterName = "storage-integration-cluster";
- String instanceName = "localhost_8905";
- String file = null;
- String stateModelValue = "MasterSlave";
- int delay = 0;
- boolean skipZeroArgs = true;// false is for dev testing
- if (!skipZeroArgs || args.length > 0)
- {
- CommandLine cmd = processCommandLineArgs(args);
- zkConnectString = cmd.getOptionValue(zkServer);
- clusterName = cmd.getOptionValue(cluster);
-
- String host = cmd.getOptionValue(hostAddress);
- String portString = cmd.getOptionValue(hostPort);
- int port = Integer.parseInt(portString);
- instanceName = host + "_" + port;
-
- file = cmd.getOptionValue(configFile);
- if (file != null)
- {
- File f = new File(file);
- if (!f.exists())
- {
- System.err.println("static config file doesn't exist");
- System.exit(1);
- }
- }
-
- stateModelValue = cmd.getOptionValue(stateModel);
- if (cmd.hasOption(transDelay))
- {
- try
- {
- delay = Integer.parseInt(cmd.getOptionValue(transDelay));
- if (delay < 0)
- {
- throw new Exception("delay must be positive");
- }
- } catch (Exception e)
- {
- e.printStackTrace();
- delay = 0;
- }
- }
- }
- // Espresso_driver.py will consume this
- System.out.println("Starting Process with ZK:" + zkConnectString);
-
- ExampleProcess process = new ExampleProcess(zkConnectString, clusterName,
- instanceName, file, stateModelValue, delay);
-
- process.start();
- Thread.currentThread().join();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/examples/IdealStateExample.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/examples/IdealStateExample.java b/helix-core/src/main/java/com/linkedin/helix/examples/IdealStateExample.java
deleted file mode 100644
index b363118..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/examples/IdealStateExample.java
+++ /dev/null
@@ -1,163 +0,0 @@
-package com.linkedin.helix.examples;
-
-import java.io.File;
-
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.manager.zk.ZKHelixAdmin;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.StateModelDefinition;
-import com.linkedin.helix.model.IdealState.IdealStateModeProperty;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.StateModelConfigGenerator;
-
-/**
- * Ideal state json format file used in this example for CUSTOMIZED ideal state mode
- * <p>
- * <pre>
- * {
- * "id" : "TestDB",
- * "mapFields" : {
- * "TestDB_0" : {
- * "localhost_12918" : "MASTER",
- * "localhost_12919" : "SLAVE",
- * "localhost_12920" : "SLAVE"
- * },
- * "TestDB_1" : {
- * "localhost_12918" : "MASTER",
- * "localhost_12919" : "SLAVE",
- * "localhost_12920" : "SLAVE"
- * },
- * "TestDB_2" : {
- * "localhost_12918" : "MASTER",
- * "localhost_12919" : "SLAVE",
- * "localhost_12920" : "SLAVE"
- * },
- * "TestDB_3" : {
- * "localhost_12918" : "MASTER",
- * "localhost_12919" : "SLAVE",
- * "localhost_12920" : "SLAVE"
- * }
- * },
- * "listFields" : {
- * },
- * "simpleFields" : {
- * "IDEAL_STATE_MODE" : "CUSTOMIZED",
- * "NUM_PARTITIONS" : "4",
- * "REPLICAS" : "3",
- * "STATE_MODEL_DEF_REF" : "MasterSlave",
- * "STATE_MODEL_FACTORY_NAME" : "DEFAULT"
- * }
- * }
- * </pre>
- *
- */
-
-public class IdealStateExample
-{
-
- public static void main(String[] args) throws Exception
- {
- if (args.length < 3)
- {
- System.err.println("USAGE: IdealStateExample zkAddress clusterName idealStateMode (AUTO, AUTO_REBALANCE, or CUSTOMIZED) idealStateJsonFile (required for CUSTOMIZED mode)");
- System.exit(1);
- }
-
- final String zkAddr = args[0];
- final String clusterName = args[1];
- final String idealStateModeStr = args[2].toUpperCase();
- String idealStateJsonFile = null;
- IdealStateModeProperty idealStateMode =
- IdealStateModeProperty.valueOf(idealStateModeStr);
- if (idealStateMode == IdealStateModeProperty.CUSTOMIZED)
- {
- if (args.length < 4)
- {
- System.err.println("Missng idealStateJsonFile for CUSTOMIZED ideal state mode");
- System.exit(1);
- }
- idealStateJsonFile = args[3];
- }
-
- // add cluster {clusterName}
- ZkClient zkclient =
- new ZkClient(zkAddr,
- ZkClient.DEFAULT_SESSION_TIMEOUT,
- ZkClient.DEFAULT_CONNECTION_TIMEOUT,
- new ZNRecordSerializer());
- ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
- admin.addCluster(clusterName, true);
-
- // add MasterSlave state mode definition
- StateModelConfigGenerator generator = new StateModelConfigGenerator();
- admin.addStateModelDef(clusterName,
- "MasterSlave",
- new StateModelDefinition(generator.generateConfigForMasterSlave()));
-
- // add 3 participants: "localhost:{12918, 12919, 12920}"
- for (int i = 0; i < 3; i++)
- {
- int port = 12918 + i;
- InstanceConfig config = new InstanceConfig("localhost_" + port);
- config.setHostName("localhost");
- config.setPort(Integer.toString(port));
- config.setInstanceEnabled(true);
- admin.addInstance(clusterName, config);
- }
-
- // add resource "TestDB" which has 4 partitions and uses MasterSlave state model
- String resourceName = "TestDB";
- if (idealStateMode == IdealStateModeProperty.AUTO
- || idealStateMode == IdealStateModeProperty.AUTO_REBALANCE)
- {
- admin.addResource(clusterName, resourceName, 4, "MasterSlave", idealStateModeStr);
-
- // rebalance resource "TestDB" using 3 replicas
- admin.rebalance(clusterName, resourceName, 3);
- }
- else if (idealStateMode == IdealStateModeProperty.CUSTOMIZED)
- {
- admin.addIdealState(clusterName, resourceName, idealStateJsonFile);
- }
-
- // start helix controller
- new Thread(new Runnable()
- {
-
- @Override
- public void run()
- {
- try
- {
- HelixControllerMain.main(new String[] { "--zkSvr", zkAddr, "--cluster",
- clusterName });
- }
- catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- }).start();
-
- // start 3 dummy participants
- for (int i = 0; i < 3; i++)
- {
- int port = 12918 + i;
- final String instanceName = "localhost_" + port;
- new Thread(new Runnable()
- {
-
- @Override
- public void run()
- {
- DummyParticipant.main(new String[] { zkAddr, clusterName, instanceName });
- }
- }).start();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/examples/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/examples/LeaderStandbyStateModelFactory.java b/helix-core/src/main/java/com/linkedin/helix/examples/LeaderStandbyStateModelFactory.java
deleted file mode 100644
index 322e3e0..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/examples/LeaderStandbyStateModelFactory.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.examples;
-
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-
-public class LeaderStandbyStateModelFactory extends
- StateModelFactory<StateModel> {
- int _delay;
-
- public LeaderStandbyStateModelFactory(int delay) {
- _delay = delay;
- }
-
- @Override
- public StateModel createNewStateModel(String stateUnitKey) {
- LeaderStandbyStateModel stateModel = new LeaderStandbyStateModel();
- stateModel.setDelay(_delay);
- return stateModel;
- }
-
- public static class LeaderStandbyStateModel extends StateModel {
- int _transDelay = 0;
-
- public void setDelay(int delay) {
- _transDelay = delay > 0 ? delay : 0;
- }
-
- public void onBecomeLeaderFromStandby(Message message,
- NotificationContext context) {
- System.out
- .println("LeaderStandbyStateModel.onBecomeLeaderFromStandby()");
- sleep();
- }
-
- public void onBecomeStandbyFromLeader(Message message,
- NotificationContext context) {
- System.out
- .println("LeaderStandbyStateModel.onBecomeStandbyFromLeader()");
- sleep();
- }
-
- private void sleep() {
- try {
- Thread.sleep(_transDelay);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/examples/MasterSlaveStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/examples/MasterSlaveStateModelFactory.java b/helix-core/src/main/java/com/linkedin/helix/examples/MasterSlaveStateModelFactory.java
deleted file mode 100644
index 58f80a9..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/examples/MasterSlaveStateModelFactory.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.examples;
-
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-
-@SuppressWarnings("rawtypes")
-public class MasterSlaveStateModelFactory extends StateModelFactory<StateModel> {
- int _delay;
-
- public MasterSlaveStateModelFactory(int delay) {
- _delay = delay;
- }
-
- @Override
- public StateModel createNewStateModel(String stateUnitKey) {
- MasterSlaveStateModel stateModel = new MasterSlaveStateModel();
- stateModel.setDelay(_delay);
- stateModel.setStateUnitKey(stateUnitKey);
- return stateModel;
- }
-
- public static class MasterSlaveStateModel extends StateModel {
- int _transDelay = 0;
- String stateUnitKey;
-
- public String getStateUnitKey() {
- return stateUnitKey;
- }
-
- public void setStateUnitKey(String stateUnitKey) {
- this.stateUnitKey = stateUnitKey;
- }
-
- public void setDelay(int delay) {
- _transDelay = delay > 0 ? delay : 0;
- }
-
- public void onBecomeSlaveFromOffline(Message message,
- NotificationContext context) {
-
- System.out.println("MasterSlaveStateModel.onBecomeSlaveFromOffline() for "+ stateUnitKey);
- sleep();
- }
-
- private void sleep() {
- try {
- Thread.sleep(_transDelay);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public void onBecomeSlaveFromMaster(Message message,
- NotificationContext context) {
- System.out.println("MasterSlaveStateModel.onBecomeSlaveFromMaster() for "+ stateUnitKey);
- sleep();
-
- }
-
- public void onBecomeMasterFromSlave(Message message,
- NotificationContext context) {
- System.out.println("MasterSlaveStateModel.onBecomeMasterFromSlave() for "+ stateUnitKey);
- sleep();
-
- }
-
- public void onBecomeOfflineFromSlave(Message message,
- NotificationContext context) {
- System.out.println("MasterSlaveStateModel.onBecomeOfflineFromSlave() for "+ stateUnitKey);
- sleep();
-
- }
-
- public void onBecomeDroppedFromOffline(Message message,
- NotificationContext context) {
- System.out.println("ObBecomeDroppedFromOffline() for "+ stateUnitKey);
- sleep();
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/examples/OnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/examples/OnlineOfflineStateModelFactory.java b/helix-core/src/main/java/com/linkedin/helix/examples/OnlineOfflineStateModelFactory.java
deleted file mode 100644
index 72c0bdc..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/examples/OnlineOfflineStateModelFactory.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.examples;
-
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-
-public class OnlineOfflineStateModelFactory extends
- StateModelFactory<StateModel> {
- int _delay;
-
- public OnlineOfflineStateModelFactory(int delay) {
- _delay = delay;
- }
-
- @Override
- public StateModel createNewStateModel(String stateUnitKey) {
- OnlineOfflineStateModel stateModel = new OnlineOfflineStateModel();
- stateModel.setDelay(_delay);
- return stateModel;
- }
-
- public static class OnlineOfflineStateModel extends StateModel {
- int _transDelay = 0;
-
- public void setDelay(int delay) {
- _transDelay = delay > 0 ? delay : 0;
- }
-
- public void onBecomeOnlineFromOffline(Message message,
- NotificationContext context) {
- System.out
- .println("OnlineOfflineStateModel.onBecomeOnlineFromOffline()");
- sleep();
- }
-
- public void onBecomeOfflineFromOnline(Message message,
- NotificationContext context) {
- System.out
- .println("OnlineOfflineStateModel.onBecomeOfflineFromOnline()");
- sleep();
- }
-
- public void onBecomeDroppedFromOffline(Message message,
- NotificationContext context) {
- System.out.println("OnlineOfflineStateModel.onBecomeDroppedFromOffline()");
- sleep();
-
- }
-
- private void sleep() {
- try {
- Thread.sleep(_transDelay);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/examples/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/examples/package-info.java b/helix-core/src/main/java/com/linkedin/helix/examples/package-info.java
deleted file mode 100644
index 685015c..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/examples/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Examples of using Helix cluster manager
- *
- */
-package com.linkedin.helix.examples;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/AccumulateAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/AccumulateAggregationType.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/AccumulateAggregationType.java
deleted file mode 100644
index 19acb50..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/AccumulateAggregationType.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import org.apache.log4j.Logger;
-
-public class AccumulateAggregationType implements AggregationType
-{
-
- private static final Logger logger = Logger
- .getLogger(AccumulateAggregationType.class);
-
- public final static String TYPE_NAME = "accumulate";
-
- @Override
- public String getName()
- {
- return TYPE_NAME;
- }
-
- @Override
- public String merge(String iv, String ev, long prevTimestamp)
- {
- double inVal = Double.parseDouble(iv);
- double existingVal = Double.parseDouble(ev);
- return String.valueOf(inVal + existingVal);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/AggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/AggregationType.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/AggregationType.java
deleted file mode 100644
index 657f350..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/AggregationType.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-public interface AggregationType
-{
-
- // public abstract <T extends Object> T merge(T iv, T ev);
-
- public final static String DELIM = "#";
-
- public abstract String merge(String incomingVal, String existingVal,
- long prevTimestamp);
-
- public abstract String getName();
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/AggregationTypeFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/AggregationTypeFactory.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/AggregationTypeFactory.java
deleted file mode 100644
index 2fad273..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/AggregationTypeFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.util.StringTokenizer;
-
-import org.apache.log4j.Logger;
-
-public class AggregationTypeFactory
-{
- private static final Logger logger = Logger
- .getLogger(AggregationTypeFactory.class);
-
- public AggregationTypeFactory()
- {
- }
-
- // TODO: modify this function so that it takes a single string, but can parse
- // apart params from type
- public static AggregationType getAggregationType(String input)
- {
- if (input == null)
- {
- logger.error("AggregationType name is null");
- return null;
- }
- StringTokenizer tok = new StringTokenizer(input, AggregationType.DELIM);
- String type = tok.nextToken();
- int numParams = tok.countTokens();
- String[] params = new String[numParams];
- for (int i = 0; i < numParams; i++)
- {
- if (!tok.hasMoreTokens())
- {
- logger.error("Trying to parse non-existent params");
- return null;
- }
- params[i] = tok.nextToken();
- }
-
- if (type.equals(AccumulateAggregationType.TYPE_NAME))
- {
- return new AccumulateAggregationType();
- }
- else if (type.equals(DecayAggregationType.TYPE_NAME))
- {
- if (params.length < 1)
- {
- logger
- .error("DecayAggregationType must contain <decay weight> parameter");
- return null;
- }
- return new DecayAggregationType(Double.parseDouble(params[0]));
- }
- else if (type.equals(WindowAggregationType.TYPE_NAME))
- {
- if (params.length < 1)
- {
- logger
- .error("WindowAggregationType must contain <window size> parameter");
- }
- return new WindowAggregationType(Integer.parseInt(params[0]));
- }
- else
- {
- logger.error("Unknown AggregationType " + type);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/DecayAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/DecayAggregationType.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/DecayAggregationType.java
deleted file mode 100644
index 5a302d3..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/DecayAggregationType.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.util.TimerTask;
-
-import org.apache.log4j.Logger;
-
-public class DecayAggregationType implements AggregationType
-{
-
- private static final Logger logger = Logger
- .getLogger(DecayAggregationType.class);
-
- public final static String TYPE_NAME = "decay";
-
- double _decayFactor = 0.1;
-
- public DecayAggregationType(double df)
- {
- super();
- _decayFactor = df;
- }
-
- @Override
- public String getName()
- {
- StringBuilder sb = new StringBuilder();
- sb.append(TYPE_NAME);
- sb.append(DELIM);
- sb.append(_decayFactor);
- return sb.toString();
- }
-
- @Override
- public String merge(String iv, String ev, long prevTimestamp)
- {
- double incomingVal = Double.parseDouble(iv);
- double existingVal = Double.parseDouble(ev);
- long currTimestamp = System.currentTimeMillis();
- double minutesOld = (currTimestamp - prevTimestamp) / 60000.0;
- // come up with decay coeff for old value. More time passed, the more it
- // decays
- double oldDecayCoeff = Math.pow((1 - _decayFactor), minutesOld);
- return String
- .valueOf((double) (oldDecayCoeff * existingVal + (1 - oldDecayCoeff)
- * incomingVal));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/DefaultHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/DefaultHealthReportProvider.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/DefaultHealthReportProvider.java
deleted file mode 100644
index b55866e..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/DefaultHealthReportProvider.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.OperatingSystemMXBean;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ZNRecord;
-
-class DefaultHealthReportProvider extends HealthReportProvider
-{
- private static final Logger _logger = Logger
- .getLogger(DefaultHealthReportProvider.class);
-
- public final static String _availableCPUs = "availableCPUs";
- public final static String _freePhysicalMemory = "freePhysicalMemory";
- public final static String _totalJvmMemory = "totalJvmMemory";
- public final static String _freeJvmMemory = "freeJvmMemory";
- public final static String _averageSystemLoad = "averageSystemLoad";
-
- public DefaultHealthReportProvider()
- {
- }
-
- @Override
- public Map<String, String> getRecentHealthReport()
- {
- OperatingSystemMXBean osMxBean = ManagementFactory
- .getOperatingSystemMXBean();
- long freeJvmMemory = Runtime.getRuntime().freeMemory();
- long totalJvmMemory = Runtime.getRuntime().totalMemory();
- int availableCPUs = osMxBean.getAvailableProcessors();
- double avgSystemLoad = osMxBean.getSystemLoadAverage();
- long freePhysicalMemory = Long.MAX_VALUE;
-
- try
- {
- // if( osMxBean instanceof com.sun.management.OperatingSystemMXBean)
- // {
- // com.sun.management.OperatingSystemMXBean sunOsMxBean
- // = (com.sun.management.OperatingSystemMXBean) osMxBean;
- // freePhysicalMemory = sunOsMxBean.getFreePhysicalMemorySize();
- // }
- }
- catch (Throwable t)
- {
- _logger.error(t);
- }
-
- Map<String, String> result = new TreeMap<String, String>();
-
- result.put(_availableCPUs, "" + availableCPUs);
- result.put(_freePhysicalMemory, "" + freePhysicalMemory);
- result.put(_freeJvmMemory, "" + freeJvmMemory);
- result.put(_totalJvmMemory, "" + totalJvmMemory);
- result.put(_averageSystemLoad, "" + avgSystemLoad);
-
- return result;
- }
-
- @Override
- public Map<String, Map<String, String>> getRecentPartitionHealthReport()
- {
- Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
-
- result.put(getReportName(), getRecentHealthReport());
- return result;
- }
-
- @Override
- public void resetStats()
- {
- // TODO Auto-generated method stub
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/DefaultPerfCounters.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/DefaultPerfCounters.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/DefaultPerfCounters.java
deleted file mode 100644
index e4eccd2..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/DefaultPerfCounters.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.util.Date;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ZNRecord;
-
-@Deprecated
-public class DefaultPerfCounters extends ZNRecord
-{
- private static final Logger _logger = Logger
- .getLogger(DefaultPerfCounters.class);
-
- public final static String _availableCPUs = "availableCPUs";
- public final static String _freePhysicalMemory = "freePhysicalMemory";
- public final static String _totalJvmMemory = "totalJvmMemory";
- public final static String _freeJvmMemory = "freeJvmMemory";
- public final static String _averageSystemLoad = "averageSystemLoad";
-
- public DefaultPerfCounters(String instanceName, long availableCPUs,
- long freePhysicalMemory, long freeJvmMemory, long totalJvmMemory,
- double averageSystemLoad)
- {
- super("DefaultPerfCounters");
- setSimpleField("instanceName", instanceName);
- setSimpleField("createTime", new Date().toString());
-
- setSimpleField(_availableCPUs, "" + availableCPUs);
- setSimpleField(_freePhysicalMemory, "" + freePhysicalMemory);
- setSimpleField(_freeJvmMemory, "" + freeJvmMemory);
- setSimpleField(_totalJvmMemory, "" + totalJvmMemory);
- setSimpleField(_averageSystemLoad, "" + averageSystemLoad);
- }
-
- public long getAvailableCpus()
- {
- return getSimpleLongVal(_availableCPUs);
- }
-
- public double getAverageSystemLoad()
- {
- return getSimpleDoubleVal(_averageSystemLoad);
- }
-
- public long getTotalJvmMemory()
- {
- return getSimpleLongVal(_totalJvmMemory);
- }
-
- public long getFreeJvmMemory()
- {
- return getSimpleLongVal(_freeJvmMemory);
- }
-
- public long getFreePhysicalMemory()
- {
- return getSimpleLongVal(_freePhysicalMemory);
- }
-
- long getSimpleLongVal(String key)
- {
- String strVal = getSimpleField(key);
- if (strVal == null)
- {
- return 0;
- }
- try
- {
- return Long.parseLong(strVal);
- }
- catch (Exception e)
- {
- _logger.warn(e);
- return 0;
- }
- }
-
- double getSimpleDoubleVal(String key)
- {
- String strVal = getSimpleField(key);
- if (strVal == null)
- {
- return 0;
- }
- try
- {
- return Double.parseDouble(strVal);
- }
- catch (Exception e)
- {
- _logger.warn(e);
- return 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/HealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/HealthReportProvider.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/HealthReportProvider.java
deleted file mode 100644
index f923dd8..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/HealthReportProvider.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.util.Map;
-
-public abstract class HealthReportProvider
-{
- public static final String _defaultPerfCounters = "defaultPerfCounters";
-
- public abstract Map<String, String> getRecentHealthReport();
-
- public Map<String, Map<String, String>> getRecentPartitionHealthReport()
- {
- return null;
- }
-
- public abstract void resetStats();
-
- public String getReportName()
- {
- return _defaultPerfCounters;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/HealthStatsAggregationTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/HealthStatsAggregationTask.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/HealthStatsAggregationTask.java
deleted file mode 100644
index 21ccb7e..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/HealthStatsAggregationTask.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Timer;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixTimerTask;
-import com.linkedin.helix.controller.pipeline.Pipeline;
-import com.linkedin.helix.controller.pipeline.Stage;
-import com.linkedin.helix.controller.stages.ClusterEvent;
-import com.linkedin.helix.controller.stages.ReadHealthDataStage;
-import com.linkedin.helix.controller.stages.StatsAggregationStage;
-import com.linkedin.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
-import com.linkedin.helix.monitoring.mbeans.HelixStageLatencyMonitor;
-
-public class HealthStatsAggregationTask extends HelixTimerTask
-{
- private static final Logger LOG = Logger.getLogger(HealthStatsAggregationTask.class);
-
- public final static int DEFAULT_HEALTH_CHECK_LATENCY = 30 * 1000;
-
- private Timer _timer;
- private final HelixManager _manager;
- private final Pipeline _healthStatsAggregationPipeline;
- private final int _delay;
- private final int _period;
- private final ClusterAlertMBeanCollection _alertItemCollection;
- private final Map<String, HelixStageLatencyMonitor> _stageLatencyMonitorMap =
- new HashMap<String, HelixStageLatencyMonitor>();
-
- public HealthStatsAggregationTask(HelixManager manager, int delay, int period)
- {
- _manager = manager;
- _delay = delay;
- _period = period;
-
- // health stats pipeline
- _healthStatsAggregationPipeline = new Pipeline();
- _healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
- StatsAggregationStage statAggregationStage = new StatsAggregationStage();
- _healthStatsAggregationPipeline.addStage(statAggregationStage);
- _alertItemCollection = statAggregationStage.getClusterAlertMBeanCollection();
-
- registerStageLatencyMonitor(_healthStatsAggregationPipeline);
- }
-
- public HealthStatsAggregationTask(HelixManager manager)
- {
- this(manager, DEFAULT_HEALTH_CHECK_LATENCY, DEFAULT_HEALTH_CHECK_LATENCY);
- }
-
- private void registerStageLatencyMonitor(Pipeline pipeline)
- {
- for (Stage stage : pipeline.getStages())
- {
- String stgName = stage.getStageName();
- if (!_stageLatencyMonitorMap.containsKey(stgName))
- {
- try
- {
- _stageLatencyMonitorMap.put(stage.getStageName(),
- new HelixStageLatencyMonitor(_manager.getClusterName(),
- stgName));
- }
- catch (Exception e)
- {
- LOG.error("Couldn't create StageLatencyMonitor mbean for stage: " + stgName, e);
- }
- }
- else
- {
- LOG.error("StageLatencyMonitor for stage: " + stgName
- + " already exists. Skip register it");
- }
- }
- }
-
- @Override
- public void start()
- {
- LOG.info("START HealthAggregationTask");
-
- if (_timer == null)
- {
- // Remove all the previous health check values, if any
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- List<String> existingHealthRecordNames = accessor.getChildNames(accessor.keyBuilder().healthReports(_manager.getInstanceName()));
- for(String healthReportName : existingHealthRecordNames)
- {
- LOG.info("Removing old healthrecord " + healthReportName);
- accessor.removeProperty(accessor.keyBuilder().healthReport(_manager.getInstanceName(),healthReportName));
- }
-
- _timer = new Timer(true);
- _timer.scheduleAtFixedRate(this, new Random().nextInt(_delay), _period);
- }
- else
- {
- LOG.warn("timer already started");
- }
- }
-
- @Override
- public synchronized void stop()
- {
- LOG.info("Stop HealthAggregationTask");
-
- if (_timer != null)
- {
- _timer.cancel();
- _timer = null;
- _alertItemCollection.reset();
-
- for (HelixStageLatencyMonitor stgLatencyMonitor : _stageLatencyMonitorMap.values())
- {
- stgLatencyMonitor.reset();
- }
- }
- else
- {
- LOG.warn("timer already stopped");
- }
- }
-
- @Override
- public synchronized void run()
- {
- if (!isEnabled())
- {
- LOG.info("HealthAggregationTask is disabled.");
- return;
- }
-
- if (!_manager.isLeader())
- {
- LOG.error("Cluster manager: " + _manager.getInstanceName()
- + " is not leader. Pipeline will not be invoked");
- return;
- }
-
- try
- {
- ClusterEvent event = new ClusterEvent("healthChange");
- event.addAttribute("helixmanager", _manager);
- event.addAttribute("HelixStageLatencyMonitorMap", _stageLatencyMonitorMap);
-
- _healthStatsAggregationPipeline.handle(event);
- _healthStatsAggregationPipeline.finish();
- }
- catch (Exception e)
- {
- LOG.error("Exception while executing pipeline: " + _healthStatsAggregationPipeline,
- e);
- }
- }
-
- private boolean isEnabled()
- {
- ConfigAccessor configAccessor = _manager.getConfigAccessor();
- boolean enabled = true;
- if (configAccessor != null)
- {
- // zk-based cluster manager
- ConfigScope scope =
- new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
- String isEnabled = configAccessor.get(scope, "healthChange.enabled");
- if (isEnabled != null)
- {
- enabled = new Boolean(isEnabled);
- }
- }
- else
- {
- LOG.debug("File-based cluster manager doesn't support disable healthChange");
- }
- return enabled;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/ParticipantHealthReportCollector.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/ParticipantHealthReportCollector.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/ParticipantHealthReportCollector.java
deleted file mode 100644
index 6e16106..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/ParticipantHealthReportCollector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import com.linkedin.helix.ZNRecord;
-
-public interface ParticipantHealthReportCollector
-{
- public abstract void addHealthReportProvider(HealthReportProvider provider);
-
- public abstract void removeHealthReportProvider(HealthReportProvider provider);
-
- public abstract void reportHealthReportMessage(ZNRecord healthReport);
-
- public abstract void transmitHealthReports();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/ParticipantHealthReportCollectorImpl.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
deleted file mode 100644
index 92ee25b..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.alerts.StatsHolder;
-import com.linkedin.helix.model.HealthStat;
-
-public class ParticipantHealthReportCollectorImpl implements
- ParticipantHealthReportCollector
-{
- private final LinkedList<HealthReportProvider> _healthReportProviderList = new LinkedList<HealthReportProvider>();
- private Timer _timer;
- private static final Logger _logger = Logger
- .getLogger(ParticipantHealthReportCollectorImpl.class);
- private final HelixManager _helixManager;
- String _instanceName;
- public final static int DEFAULT_REPORT_LATENCY = 60 * 1000;
-
- public ParticipantHealthReportCollectorImpl(HelixManager helixManager,
- String instanceName)
- {
- _helixManager = helixManager;
- _instanceName = instanceName;
- addDefaultHealthCheckInfoProvider();
- }
-
- private void addDefaultHealthCheckInfoProvider()
- {
- addHealthReportProvider(new DefaultHealthReportProvider());
- }
-
- public void start()
- {
- if (_timer == null)
- {
- _timer = new Timer(true);
- _timer.scheduleAtFixedRate(new HealthCheckInfoReportingTask(),
- new Random().nextInt(DEFAULT_REPORT_LATENCY), DEFAULT_REPORT_LATENCY);
- }
- else
- {
- _logger.warn("timer already started");
- }
- }
-
- @Override
- public void addHealthReportProvider(HealthReportProvider provider)
- {
- try
- {
- synchronized (_healthReportProviderList)
- {
- if (!_healthReportProviderList.contains(provider))
- {
- _healthReportProviderList.add(provider);
- }
- else
- {
- _logger.warn("Skipping a duplicated HealthCheckInfoProvider");
- }
- }
- }
- catch (Exception e)
- {
- _logger.error(e);
- }
- }
-
- @Override
- public void removeHealthReportProvider(HealthReportProvider provider)
- {
- synchronized (_healthReportProviderList)
- {
- if (_healthReportProviderList.contains(provider))
- {
- _healthReportProviderList.remove(provider);
- }
- else
- {
- _logger.warn("Skip removing a non-exist HealthCheckInfoProvider");
- }
- }
- }
-
- @Override
- public void reportHealthReportMessage(ZNRecord healthCheckInfoUpdate)
- {
- HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-// accessor.setProperty(
-// PropertyType.HEALTHREPORT, healthCheckInfoUpdate, _instanceName,
-// healthCheckInfoUpdate.getId());
- accessor.setProperty(keyBuilder.healthReport(_instanceName, healthCheckInfoUpdate.getId()),
- new HealthStat(healthCheckInfoUpdate));
-
- }
-
- public void stop()
- {
- _logger.info("Stop HealthCheckInfoReportingTask");
- if (_timer != null)
- {
- _timer.cancel();
- _timer = null;
- }
- else
- {
- _logger.warn("timer already stopped");
- }
- }
-
- @Override
- public synchronized void transmitHealthReports()
- {
- synchronized (_healthReportProviderList)
- {
- for (HealthReportProvider provider : _healthReportProviderList)
- {
- try
- {
- Map<String, String> report = provider.getRecentHealthReport();
- Map<String, Map<String, String>> partitionReport = provider
- .getRecentPartitionHealthReport();
- ZNRecord record = new ZNRecord(provider.getReportName());
- if (report != null)
- {
- record.setSimpleFields(report);
- }
- if (partitionReport != null)
- {
- record.setMapFields(partitionReport);
- }
- record.setSimpleField(StatsHolder.TIMESTAMP_NAME, "" + System.currentTimeMillis());
-
- HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.healthReport(_instanceName, record.getId()),
- new HealthStat(record));
-
-// _helixManager.getDataAccessor().setProperty(
-// PropertyType.HEALTHREPORT, record, _instanceName, record.getId());
- // reset stats (for now just the partition stats)
- provider.resetStats();
- }
- catch (Exception e)
- {
- _logger.error("", e);
- }
- }
- }
- }
-
- class HealthCheckInfoReportingTask extends TimerTask
- {
- @Override
- public void run()
- {
- transmitHealthReports();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/PerformanceHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/PerformanceHealthReportProvider.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/PerformanceHealthReportProvider.java
deleted file mode 100644
index b1fc3cc..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/PerformanceHealthReportProvider.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-
-public class PerformanceHealthReportProvider extends HealthReportProvider
-{
-
- private static final Logger _logger = Logger
- .getLogger(PerformanceHealthReportProvider.class);
-
- public final static String _testStat = "testStat";
- public final static String _readLatencyStat = "readLatencyStat";
- public final static String _requestCountStat = "requestCountStat";
- public final static String _partitionRequestCountStat = "partitionRequestCountStat";
-
- public static final String _performanceCounters = "performanceCounters";
-
- public int readLatencyCount = 0;
- public double readLatencySum = 0;
-
- public int requestCount = 0;
-
- // private final Map<String, String> _partitionCountsMap = new HashMap<String,
- // String>();
-
- private final Map<String, HashMap<String, String>> _partitionStatMaps = new HashMap<String, HashMap<String, String>>();
-
- public PerformanceHealthReportProvider()
- {
- }
-
- @Override
- public Map<String, String> getRecentHealthReport()
- {
- long testStat = 10;
-
- Map<String, String> result = new TreeMap<String, String>();
-
- result.put(_testStat, "" + testStat);
- result.put(_readLatencyStat, "" + readLatencySum
- / (double) readLatencyCount);
- result.put(_requestCountStat, "" + requestCount);
-
- return result;
- }
-
- @Override
- public Map<String, Map<String, String>> getRecentPartitionHealthReport()
- {
- Map<String, Map<String, String>> result = new TreeMap<String, Map<String, String>>();
- for (String statName : _partitionStatMaps.keySet())
- {
- result.put(statName, _partitionStatMaps.get(statName));
- }
- return result;
- }
-
- HashMap<String, String> getStatMap(String statName, boolean createIfMissing)
- {
- // check if map for this stat exists. if not, create it
- HashMap<String, String> statMap;
- if (!_partitionStatMaps.containsKey(statName))
- {
- if (!createIfMissing)
- {
- return null;
- }
- statMap = new HashMap<String, String>();
- _partitionStatMaps.put(statName, statMap);
- }
- else
- {
- statMap = _partitionStatMaps.get(statName);
- }
- return statMap;
- }
-
- // TODO:
- // Currently participant is source of truth and updates ZK. We want ZK to be
- // source of truth.
- // Revise this approach the participant sends deltas of stats to controller
- // (ZK?) and have controller do aggregation
- // and update ZK. Make sure to wipe the participant between uploads.
- String getPartitionStat(HashMap<String, String> partitionMap,
- String partitionName)
- {
- return partitionMap.get(partitionName);
- }
-
- void setPartitionStat(HashMap<String, String> partitionMap,
- String partitionName, String value)
- {
- partitionMap.put(partitionName, value);
- }
-
- public void incrementPartitionStat(String statName, String partitionName)
- {
- HashMap<String, String> statMap = getStatMap(statName, true);
- String currValStr = getPartitionStat(statMap, partitionName);
- double currVal;
- if (currValStr == null)
- {
- currVal = 1.0;
- }
- else
- {
- currVal = Double.parseDouble(getPartitionStat(statMap, partitionName));
- currVal++;
- }
- setPartitionStat(statMap, partitionName, String.valueOf(currVal));
- }
-
- public void submitPartitionStat(String statName, String partitionName,
- String value)
- {
- HashMap<String, String> statMap = getStatMap(statName, true);
- setPartitionStat(statMap, partitionName, value);
- }
-
- public String getPartitionStat(String statName, String partitionName)
- {
- HashMap<String, String> statMap = getStatMap(statName, false);
- if (statMap == null)
- {
- return null;
- }
- else
- {
- return statMap.get(partitionName);
- }
- }
-
- public void resetStats()
- {
- _partitionStatMaps.clear();
- }
-
- public String getReportName()
- {
- return _performanceCounters;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/healthcheck/Stat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/healthcheck/Stat.java b/helix-core/src/main/java/com/linkedin/helix/healthcheck/Stat.java
deleted file mode 100644
index 684834c..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/healthcheck/Stat.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class Stat
-{
-
- private static final Logger _logger = Logger.getLogger(Stat.class);
-
- public final static String OP_TYPE = "HTTP_OP";
- public final static String MEASUREMENT_TYPE = "MEASUREMENT";
- public final static String RESOURCE_NAME = "RESOURCE_NAME";
- public final static String PARTITION_NAME = "PARTITION_NAME";
- public final static String NODE_NAME = "NODE_NAME";
- public final static String TIMESTAMP = "TIMESTAMP";
- public final static String RETURN_STATUS = "RETURN_STATUS";
- public final static String METRIC_NAME = "METRIC_NAME";
- public final static String AGG_TYPE = "AGG_TYPE";
-
- public String _opType;
- public String _measurementType;
- public String _resourceName;
- public String _partitionName;
- public String _nodeName;
- public String _returnStatus;
- public String _metricName;
- public String _aggTypeName;
- public String _timestamp;
-
- public Stat(String opType, String measurementType, String resourceName,
- String partitionName, String nodeName)
- {
- // this(opType, measurementType, resourceName, partitionName, nodeName,
- // null, null, null);
- this(opType, measurementType, resourceName, partitionName, nodeName, null,
- null, null);
- }
-
- public Stat(String opType, String measurementType, String resourceName,
- String partitionName, String nodeName, String returnStatus,
- String metricName, AggregationType aggType)
- {
- this._opType = opType;
- this._measurementType = measurementType;
- this._resourceName = resourceName;
- this._partitionName = partitionName;
- this._nodeName = nodeName;
- this._returnStatus = returnStatus;
- this._metricName = metricName;
- this._aggTypeName = null;
- if (aggType != null)
- {
- this._aggTypeName = aggType.getName();
- }
-
- _timestamp = String.valueOf(System.currentTimeMillis());
- }
-
- public Stat(Map<String, String> in)
- {
- _opType = in.get(OP_TYPE);
- _measurementType = in.get(MEASUREMENT_TYPE);
- _resourceName = in.get(RESOURCE_NAME);
- _partitionName = in.get(PARTITION_NAME);
- _nodeName = in.get(NODE_NAME);
- _timestamp = String.valueOf(System.currentTimeMillis());
- }
-
- public void setAggType(AggregationType aggType)
- {
- this._aggTypeName = aggType.getName();
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (!(obj instanceof Stat))
- {
- return false;
- }
- Stat other = (Stat) obj;
- if (!_partitionName.equals(other._partitionName))
- {
- return false;
- }
- if (!_opType.equals(other._opType))
- {
- return false;
- }
- if (!_measurementType.equals(other._measurementType))
- {
- return false;
- }
- if (!_resourceName.equals(other._resourceName))
- {
- return false;
- }
- if (!_nodeName.equals(other._nodeName))
- {
- return false;
- }
- return true;
- }
-
- @Override
- public int hashCode()
- {
- return (_partitionName + _opType + _measurementType + _resourceName + _nodeName)
- .hashCode();
- }
-
- public void addAlert(long value)
- {
- // TODO Auto-generated method stub
-
- }
-
- public String toString()
- {
- return _nodeName + "." + _resourceName + "." + _partitionName + "."
- + _opType + "." + _measurementType + "." + _returnStatus + "."
- + _metricName + "." + _aggTypeName;
- }
-}