You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[43/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/relay/DummyRelayProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/relay/DummyRelayProcess.java b/helix-core/src/test/java/com/linkedin/helix/mock/relay/DummyRelayProcess.java
deleted file mode 100644
index c8811ae..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/relay/DummyRelayProcess.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.relay;
-
-import java.io.File;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.OptionGroup;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.participant.HelixStateMachineEngine;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class DummyRelayProcess
-{
-
- public static final String zkServer = "zkSvr";
- public static final String cluster = "cluster";
- public static final String hostAddress = "host";
- public static final String hostPort = "port";
- public static final String relayCluster = "relayCluster";
- public static final String help = "help";
- public static final String configFile = "configFile";
-
- private final String zkConnectString;
- private final String clusterName;
- private final String instanceName;
- private HelixManager manager;
- private DummyStateModelFactory stateModelFactory;
- private HelixStateMachineEngine genericStateMachineHandler;
-
- private final String _clusterViewFile;
-
- public DummyRelayProcess(String zkConnectString, String clusterName,
- String instanceName, String clusterViewFile)
- {
- this.zkConnectString = zkConnectString;
- this.clusterName = clusterName;
- this.instanceName = instanceName;
- this._clusterViewFile = clusterViewFile;
- }
-
- public void start() throws Exception
- {
- if (_clusterViewFile == null)
- {
- manager = HelixManagerFactory.getZKHelixManager(clusterName,
- instanceName,
- InstanceType.PARTICIPANT,
- zkConnectString);
- }
- else
- {
- manager = HelixManagerFactory.getStaticFileHelixManager(clusterName,
- instanceName,
- InstanceType.PARTICIPANT,
- _clusterViewFile);
-
- }
-
- stateModelFactory = new DummyStateModelFactory();
- genericStateMachineHandler.registerStateModelFactory("OnlineOffline", stateModelFactory);
- manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), genericStateMachineHandler);
- manager.connect();
- if (_clusterViewFile != null)
- {
- ClusterStateVerifier.verifyFileBasedClusterStates(_clusterViewFile, instanceName,
- stateModelFactory);
-
- }
- }
-
- public static class DummyStateModelFactory extends StateModelFactory<StateModel>
- {
- @Override
- public StateModel createNewStateModel(String stateUnitKey)
- {
- System.out.println("Creating state model for "+ stateUnitKey);
- return new DummyStateModel();
- }
-
- }
-
- public static class DummyStateModel extends StateModel
- {
- public void onBecomeOnlineFromOffline(Message message,
- NotificationContext context)
- {
-
- System.out.println("DummyStateModel.onBecomeSlaveFromOffline()");
- }
-
- public void onBecomeOfflineFromOnline(Message message,
- NotificationContext context)
- {
- System.out.println("DummyStateModel.onBecomeOfflineFromSlave()");
-
- }
- }
-
- @SuppressWarnings("static-access")
- private static Options constructCommandLineOptions()
- {
- Option helpOption = OptionBuilder.withLongOpt(help)
- .withDescription("Prints command-line options info").create();
-
- Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
- .withDescription("Provide zookeeper address").create();
- zkServerOption.setArgs(1);
- zkServerOption.setRequired(true);
- zkServerOption.setArgName("ZookeeperServerAddress(Required)");
-
- Option clusterOption = OptionBuilder.withLongOpt(cluster)
- .withDescription("Provide cluster name").create();
- clusterOption.setArgs(1);
- clusterOption.setRequired(true);
- clusterOption.setArgName("Cluster name (Required)");
-
- Option hostOption = OptionBuilder.withLongOpt(hostAddress)
- .withDescription("Provide host name").create();
- hostOption.setArgs(1);
- hostOption.setRequired(true);
- hostOption.setArgName("Host name (Required)");
-
- Option portOption = OptionBuilder.withLongOpt(hostPort)
- .withDescription("Provide host port").create();
- portOption.setArgs(1);
- portOption.setRequired(true);
- portOption.setArgName("Host port (Required)");
-
- // add an option group including either --zkSvr or --configFile
- Option fileOption = OptionBuilder.withLongOpt(configFile)
- .withDescription("Provide file to read states/messages").create();
- fileOption.setArgs(1);
- fileOption.setRequired(true);
- fileOption.setArgName("File to read states/messages (Optional)");
-
- OptionGroup optionGroup = new OptionGroup();
- optionGroup.addOption(zkServerOption);
- optionGroup.addOption(fileOption);
-
- Options options = new Options();
- options.addOption(helpOption);
- // options.addOption(zkServerOption);
- options.addOption(clusterOption);
- options.addOption(hostOption);
- options.addOption(portOption);
-
- options.addOptionGroup(optionGroup);
-
- return options;
- }
-
- public static void printUsage(Options cliOptions)
- {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
- }
-
- public static CommandLine processCommandLineArgs(String[] cliArgs)
- throws Exception
- {
- CommandLineParser cliParser = new GnuParser();
- Options cliOptions = constructCommandLineOptions();
- CommandLine cmd = null;
-
- try
- {
- return cliParser.parse(cliOptions, cliArgs);
- } catch (ParseException pe)
- {
- System.err
- .println("CommandLineClient: failed to parse command-line options: "
- + pe.toString());
- printUsage(cliOptions);
- System.exit(1);
- }
- return null;
- }
-
- public static void main(String[] args) throws Exception
- {
- String zkConnectString = "pganti-md:2181";
- String clusterName = "SDR_RELAY";
- String instanceName = "ela4-rly02.prod.linkedin.com_10015";
- String file = null;
-
- if (args.length > 0)
- {
- CommandLine cmd = processCommandLineArgs(args);
- zkConnectString = cmd.getOptionValue(zkServer);
- clusterName = cmd.getOptionValue(cluster);
-
- String host = cmd.getOptionValue(hostAddress);
- String portString = cmd.getOptionValue(hostPort);
- int port = Integer.parseInt(portString);
- instanceName = host + "_" + port;
-
- file = cmd.getOptionValue(configFile);
- if (file != null)
- {
- File f = new File(file);
- if (!f.exists())
- {
- System.err.println("static config file doesn't exist");
- System.exit(1);
- }
- }
-
- }
- // Espresso_driver.py will consume this
- System.out.println("Dummy process started");
-
- DummyRelayProcess process = new DummyRelayProcess(zkConnectString,
- clusterName, instanceName, file);
-
- process.start();
- Thread.currentThread().join();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/relay/RelayIdealStateGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/relay/RelayIdealStateGenerator.java b/helix-core/src/test/java/com/linkedin/helix/mock/relay/RelayIdealStateGenerator.java
deleted file mode 100644
index 17c22e2..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/relay/RelayIdealStateGenerator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.relay;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.model.IdealState.IdealStateProperty;
-
-public class RelayIdealStateGenerator
-{
- public static void main(String[] args)
- {
- ZNRecord record = new ZNRecord("SdrRelay");
- record.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), "28");
- for (int i = 22; i < 28; i++)
- {
- String key = "ela4-db-sdr.prod.linkedin.com_1521,sdr1,sdr_people_search_,p"
- + i + ",MASTER";
- Map<String, String> map = new HashMap<String, String>();
- for (int j = 0; j < 4; j++)
- {
- String instanceName = "ela4-rly0" + j + ".prod.linkedin.com_10015";
- map.put(instanceName, "ONLINE");
- }
- record.getMapFields().put(key, map);
- }
-
- ZNRecordSerializer serializer = new ZNRecordSerializer();
- System.out.println(new String(serializer.serialize(record)));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/router/MockRouterProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/router/MockRouterProcess.java b/helix-core/src/test/java/com/linkedin/helix/mock/router/MockRouterProcess.java
deleted file mode 100644
index 45a28d6..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/router/MockRouterProcess.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.router;
-
-import java.util.List;
-
-import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.ZkServer;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.spectator.RoutingTableProvider;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.util.HelixUtil;
-
-/**
- * A MockRouter process to demonstrate the integration with cluster manager.
- * This uses Zookeeper in local mode and runs at port 2188
- *
- * @author kgopalak
- *
- */
-public class MockRouterProcess
-{
- private static final int port = 2188;
- static long runId = System.currentTimeMillis();
- private static final String dataDir = "/tmp/zkDataDir-" + runId;
-
- private static final String logDir = "/tmp/zkLogDir-" + runId;
-
- static String clusterName = "mock-cluster-" + runId;
-
- static String zkConnectString = "localhost:2188";
-
- private final RoutingTableProvider _routingTableProvider;
- private static ZkServer zkServer;
-
- public MockRouterProcess()
- {
- _routingTableProvider = new RoutingTableProvider();
- }
-
- public static void main(String[] args) throws Exception
- {
- setup();
- zkServer.getZkClient().setZkSerializer(new ZNRecordSerializer());
- ZNRecord record = zkServer.getZkClient().readData(
- HelixUtil.getIdealStatePath(clusterName, "TestDB"));
-
- String externalViewPath = HelixUtil.getExternalViewPath(clusterName, "TestDB");
-
- MockRouterProcess process = new MockRouterProcess();
- process.start();
- //try to route, there is no master or slave available
- process.routeRequest("TestDB", "TestDB_1");
-
- //update the externalview on zookeeper
- zkServer.getZkClient().createPersistent(externalViewPath,record);
- //sleep for sometime so that the ZK Callback is received.
- Thread.sleep(1000);
- process.routeRequest("TestDB", "TestDB_1");
- System.exit(1);
- }
-
- private static void setup()
- {
-
- IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
- {
- @Override
- public void createDefaultNameSpace(org.I0Itec.zkclient.ZkClient client)
- {
- client.deleteRecursive("/" + clusterName);
-
- }
- };
-
- zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
- zkServer.start();
- ClusterSetup clusterSetup = new ClusterSetup(zkConnectString);
- clusterSetup.setupTestCluster(clusterName);
- try
- {
- Thread.sleep(1000);
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
-
- public void routeRequest(String database, String partition)
- {
- List<InstanceConfig> masters;
- List<InstanceConfig> slaves;
- masters = _routingTableProvider.getInstances(database, partition, "MASTER");
- if (masters != null && !masters.isEmpty())
- {
- System.out.println("Available masters to route request");
- for (InstanceConfig config : masters)
- {
- System.out.println("HostName:" + config.getHostName() + " Port:"
- + config.getPort());
- }
- } else
- {
- System.out.println("No masters available to route request");
- }
- slaves = _routingTableProvider.getInstances(database, partition, "SLAVE");
- if (slaves != null && !slaves.isEmpty())
- {
- System.out.println("Available slaves to route request");
- for (InstanceConfig config : slaves)
- {
- System.out.println("HostName:" + config.getHostName() + " Port:"
- + config.getPort());
- }
- } else
- {
- System.out.println("No slaves available to route request");
- }
- }
-
- public void start()
- {
-
- try
- {
- HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName,
- null,
- InstanceType.SPECTATOR,
- zkConnectString);
-
-
- manager.connect();
- manager.addExternalViewChangeListener(_routingTableProvider);
- } catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/router/RouterAdapter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/router/RouterAdapter.java b/helix-core/src/test/java/com/linkedin/helix/mock/router/RouterAdapter.java
deleted file mode 100644
index 9a01057..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/router/RouterAdapter.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.router;
-
-public class RouterAdapter
-{
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/storage/DummyProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/storage/DummyProcess.java b/helix-core/src/test/java/com/linkedin/helix/mock/storage/DummyProcess.java
deleted file mode 100644
index 566635d..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/storage/DummyProcess.java
+++ /dev/null
@@ -1,546 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.storage;
-
-import java.io.File;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.OptionGroup;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.participant.StateMachineEngine;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-import com.linkedin.helix.store.file.FilePropertyStore;
-
-public class DummyProcess
-{
- private static final Logger logger = Logger.getLogger(DummyProcess.class);
- public static final String zkServer = "zkSvr";
- public static final String cluster = "cluster";
- public static final String hostAddress = "host";
- public static final String hostPort = "port";
- public static final String relayCluster = "relayCluster";
- public static final String help = "help";
- public static final String clusterViewFile = "clusterViewFile";
- public static final String transDelay = "transDelay";
- public static final String helixManagerType = "helixManagerType";
-// public static final String rootNamespace = "rootNamespace";
-
- private final String _zkConnectString;
- private final String _clusterName;
- private final String _instanceName;
- private DummyStateModelFactory stateModelFactory;
-// private StateMachineEngine genericStateMachineHandler;
-
- private final FilePropertyStore<ZNRecord> _fileStore;
-
- private final String _clusterViewFile;
- private int _transDelayInMs = 0;
- private final String _clusterMangerType;
-
- public DummyProcess(String zkConnectString,
- String clusterName,
- String instanceName,
- String clusterMangerType,
- String clusterViewFile,
- int delay)
- {
- this(zkConnectString, clusterName, instanceName, "zk", clusterViewFile, delay, null);
- }
-
- public DummyProcess(String zkConnectString,
- String clusterName,
- String instanceName,
- String clusterMangerType,
- String clusterViewFile,
- int delay,
- FilePropertyStore<ZNRecord> fileStore)
- {
- _zkConnectString = zkConnectString;
- _clusterName = clusterName;
- _instanceName = instanceName;
- _clusterViewFile = clusterViewFile;
- _clusterMangerType = clusterMangerType;
- _transDelayInMs = delay > 0 ? delay : 0;
- _fileStore = fileStore;
- }
-
- static void sleep(long transDelay)
- {
- try
- {
- if (transDelay > 0)
- {
- Thread.sleep(transDelay);
- }
- }
- catch (InterruptedException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- public HelixManager start() throws Exception
- {
- HelixManager manager = null;
- // zk cluster manager
- if (_clusterMangerType.equalsIgnoreCase("zk"))
- {
- manager = HelixManagerFactory.getZKHelixManager(_clusterName,
- _instanceName,
- InstanceType.PARTICIPANT,
- _zkConnectString);
- }
- // static file cluster manager
- else if (_clusterMangerType.equalsIgnoreCase("static-file"))
- {
- manager = HelixManagerFactory.getStaticFileHelixManager(_clusterName,
- _instanceName,
- InstanceType.PARTICIPANT,
- _clusterViewFile);
-
- }
- // dynamic file cluster manager
- else if (_clusterMangerType.equalsIgnoreCase("dynamic-file"))
- {
- manager = HelixManagerFactory.getDynamicFileHelixManager(_clusterName,
- _instanceName,
- InstanceType.PARTICIPANT,
- _fileStore);
- }
- else
- {
- throw new IllegalArgumentException("Unsupported cluster manager type:" + _clusterMangerType);
- }
-
- stateModelFactory = new DummyStateModelFactory(_transDelayInMs);
- DummyLeaderStandbyStateModelFactory stateModelFactory1 = new DummyLeaderStandbyStateModelFactory(_transDelayInMs);
- DummyOnlineOfflineStateModelFactory stateModelFactory2 = new DummyOnlineOfflineStateModelFactory(_transDelayInMs);
-// genericStateMachineHandler = new StateMachineEngine();
- StateMachineEngine stateMach = manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
- stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory1);
- stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
-
- manager.connect();
-// manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), genericStateMachineHandler);
- return manager;
- }
-
- public static class DummyStateModelFactory extends StateModelFactory<DummyStateModel>
- {
- int _delay;
-
- public DummyStateModelFactory(int delay)
- {
- _delay = delay;
- }
-
- @Override
- public DummyStateModel createNewStateModel(String stateUnitKey)
- {
- DummyStateModel model = new DummyStateModel();
- model.setDelay(_delay);
- return model;
- }
- }
-
- public static class DummyLeaderStandbyStateModelFactory extends StateModelFactory<DummyLeaderStandbyStateModel>
- {
- int _delay;
-
- public DummyLeaderStandbyStateModelFactory(int delay)
- {
- _delay = delay;
- }
-
- @Override
- public DummyLeaderStandbyStateModel createNewStateModel(String stateUnitKey)
- {
- DummyLeaderStandbyStateModel model = new DummyLeaderStandbyStateModel();
- model.setDelay(_delay);
- return model;
- }
- }
-
- public static class DummyOnlineOfflineStateModelFactory extends StateModelFactory<DummyOnlineOfflineStateModel>
- {
- int _delay;
-
- public DummyOnlineOfflineStateModelFactory(int delay)
- {
- _delay = delay;
- }
-
- @Override
- public DummyOnlineOfflineStateModel createNewStateModel(String stateUnitKey)
- {
- DummyOnlineOfflineStateModel model = new DummyOnlineOfflineStateModel();
- model.setDelay(_delay);
- return model;
- }
- }
- public static class DummyStateModel extends StateModel
- {
- int _transDelay = 0;
-
- public void setDelay(int delay)
- {
- _transDelay = delay > 0 ? delay : 0;
- }
-
- public void onBecomeSlaveFromOffline(Message message,
- NotificationContext context)
- {
- String db = message.getPartitionName();
- String instanceName = context.getManager().getInstanceName();
- DummyProcess.sleep(_transDelay);
-
- logger.info("DummyStateModel.onBecomeSlaveFromOffline(), instance:" + instanceName
- + ", db:" + db);
- }
-
- public void onBecomeSlaveFromMaster(Message message,
- NotificationContext context)
- {
- DummyProcess.sleep(_transDelay);
-
- logger.info("DummyStateModel.onBecomeSlaveFromMaster()");
-
- }
-
- public void onBecomeMasterFromSlave(Message message,
- NotificationContext context)
- {
- DummyProcess.sleep(_transDelay);
-
- logger.info("DummyStateModel.onBecomeMasterFromSlave()");
-
- }
-
- public void onBecomeOfflineFromSlave(Message message,
- NotificationContext context)
- {
- DummyProcess.sleep(_transDelay);
-
- logger.info("DummyStateModel.onBecomeOfflineFromSlave()");
-
- }
-
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
- {
- DummyProcess.sleep(_transDelay);
-
- logger.info("DummyStateModel.onBecomeDroppedFromOffline()");
-
- }
- }
-
-
- public static class DummyOnlineOfflineStateModel extends StateModel
- {
- int _transDelay = 0;
-
- public void setDelay(int delay)
- {
- _transDelay = delay > 0 ? delay : 0;
- }
-
- public void onBecomeOnlineFromOffline(Message message,
- NotificationContext context)
- {
- String db = message.getPartitionName();
- String instanceName = context.getManager().getInstanceName();
- DummyProcess.sleep(_transDelay);
-
- logger.info("DummyStateModel.onBecomeOnlineFromOffline(), instance:" + instanceName
- + ", db:" + db);
- }
-
- public void onBecomeOfflineFromOnline(Message message,
- NotificationContext context)
- {
- DummyProcess.sleep(_transDelay);
-
- logger.info("DummyStateModel.onBecomeOfflineFromOnline()");
-
- }
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
- {
- DummyProcess.sleep(_transDelay);
-
- logger.info("DummyStateModel.onBecomeDroppedFromOffline()");
-
- }
- }
-
- public static class DummyLeaderStandbyStateModel extends StateModel
- {
- int _transDelay = 0;
-
- public void setDelay(int delay)
- {
- _transDelay = delay > 0 ? delay : 0;
- }
-
- public void onBecomeLeaderFromStandby(Message message,
- NotificationContext context)
- {
- String db = message.getPartitionName();
- String instanceName = context.getManager().getInstanceName();
- DummyProcess.sleep(_transDelay);
- logger.info("DummyLeaderStandbyStateModel.onBecomeLeaderFromStandby(), instance:" + instanceName
- + ", db:" + db);
- }
-
- public void onBecomeStandbyFromLeader(Message message,
- NotificationContext context)
- {
- DummyProcess.sleep(_transDelay);
-
- logger.info("DummyLeaderStandbyStateModel.onBecomeStandbyFromLeader()");
-
- }
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
- {
- DummyProcess.sleep(_transDelay);
-
- logger.info("DummyLeaderStandbyStateModel.onBecomeDroppedFromOffline()");
-
- }
-
- public void onBecomeStandbyFromOffline(Message message, NotificationContext context)
- {
- DummyProcess.sleep(_transDelay);
-
- logger.info("DummyLeaderStandbyStateModel.onBecomeStandbyFromOffline()");
-
- }
-
- public void onBecomeOfflineFromStandby(Message message, NotificationContext context)
- {
- DummyProcess.sleep(_transDelay);
-
- logger.info("DummyLeaderStandbyStateModel.onBecomeOfflineFromStandby()");
-
- }
- }
-
- // TODO hack OptionBuilder is not thread safe
- @SuppressWarnings("static-access")
- synchronized private static Options constructCommandLineOptions()
- {
- Option helpOption = OptionBuilder.withLongOpt(help)
- .withDescription("Prints command-line options info").create();
-
- Option clusterOption = OptionBuilder.withLongOpt(cluster)
- .withDescription("Provide cluster name").create();
- clusterOption.setArgs(1);
- clusterOption.setRequired(true);
- clusterOption.setArgName("Cluster name (Required)");
-
- Option hostOption = OptionBuilder.withLongOpt(hostAddress)
- .withDescription("Provide host name").create();
- hostOption.setArgs(1);
- hostOption.setRequired(true);
- hostOption.setArgName("Host name (Required)");
-
- Option portOption = OptionBuilder.withLongOpt(hostPort)
- .withDescription("Provide host port").create();
- portOption.setArgs(1);
- portOption.setRequired(true);
- portOption.setArgName("Host port (Required)");
-
- Option cmTypeOption = OptionBuilder.withLongOpt(helixManagerType)
- .withDescription("Provide cluster manager type (e.g. 'zk', 'static-file', or 'dynamic-file'").create();
- cmTypeOption.setArgs(1);
- cmTypeOption.setRequired(true);
- cmTypeOption.setArgName("Clsuter manager type (e.g. 'zk', 'static-file', or 'dynamic-file') (Required)");
-
- // add an option group including either --zkSvr or --clusterViewFile
- Option fileOption = OptionBuilder.withLongOpt(clusterViewFile)
- .withDescription("Provide a cluster-view file for static-file based cluster manager").create();
- fileOption.setArgs(1);
- fileOption.setRequired(true);
- fileOption.setArgName("Cluster-view file (Required for static-file based cluster manager)");
-
- Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
- .withDescription("Provide zookeeper address").create();
- zkServerOption.setArgs(1);
- zkServerOption.setRequired(true);
- zkServerOption.setArgName("ZookeeperServerAddress(Required for zk-based cluster manager)");
-
-// Option rootNsOption = OptionBuilder.withLongOpt(rootNamespace)
-// .withDescription("Provide root namespace for dynamic-file based cluster manager").create();
-// rootNsOption.setArgs(1);
-// rootNsOption.setRequired(true);
-// rootNsOption.setArgName("Root namespace (Required for dynamic-file based cluster manager)");
-
-
- Option transDelayOption = OptionBuilder.withLongOpt(transDelay)
- .withDescription("Provide state trans delay").create();
- transDelayOption.setArgs(1);
- transDelayOption.setRequired(false);
- transDelayOption.setArgName("Delay time in state transition, in MS");
-
- OptionGroup optionGroup = new OptionGroup();
- optionGroup.addOption(zkServerOption);
- optionGroup.addOption(fileOption);
-// optionGroup.addOption(rootNsOption);
-
- Options options = new Options();
- options.addOption(helpOption);
- options.addOption(clusterOption);
- options.addOption(hostOption);
- options.addOption(portOption);
- options.addOption(transDelayOption);
- options.addOption(cmTypeOption);
-
- options.addOptionGroup(optionGroup);
-
- return options;
- }
-
- public static void printUsage(Options cliOptions)
- {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.printHelp("java " + DummyProcess.class.getName(), cliOptions);
- }
-
- public static CommandLine processCommandLineArgs(String[] cliArgs)
- throws Exception
- {
- CommandLineParser cliParser = new GnuParser();
- Options cliOptions = constructCommandLineOptions();
- // CommandLine cmd = null;
-
- try
- {
- return cliParser.parse(cliOptions, cliArgs);
- } catch (ParseException pe)
- {
- System.err
- .println("CommandLineClient: failed to parse command-line options: "
- + pe.toString());
- printUsage(cliOptions);
- System.exit(1);
- }
- return null;
- }
-
- public static void main(String[] args) throws Exception
- {
- String cmType = "zk";
- String zkConnectString = "localhost:2181";
- String clusterName = "testCluster";
- String instanceName = "localhost_8900";
- String cvFileStr = null;
-// String rootNs = null;
- int delay = 0;
-
- if (args.length > 0)
- {
- CommandLine cmd = processCommandLineArgs(args);
- zkConnectString = cmd.getOptionValue(zkServer);
- clusterName = cmd.getOptionValue(cluster);
-
- String host = cmd.getOptionValue(hostAddress);
- String portString = cmd.getOptionValue(hostPort);
- int port = Integer.parseInt(portString);
- instanceName = host + "_" + port;
- cmType = cmd.getOptionValue(helixManagerType);
-
- if (cmd.hasOption(clusterViewFile))
- {
- cvFileStr = cmd.getOptionValue(clusterViewFile);
- if (!new File(cvFileStr).exists())
- {
- throw new IllegalArgumentException("Cluster-view file:" + cvFileStr
- + " does NOT exist");
- }
- }
-
-// if (cmd.hasOption(rootNamespace))
-// {
-// rootNs = cmd.getOptionValue(rootNamespace);
-// }
-
- if (cmd.hasOption(transDelay))
- {
- try
- {
- delay = Integer.parseInt(cmd.getOptionValue(transDelay));
- if (delay < 0)
- {
- throw new Exception("delay must be positive");
- }
- } catch (Exception e)
- {
- e.printStackTrace();
- delay = 0;
- }
- }
- }
- // Espresso_driver.py will consume this
- logger.info("Dummy process started, instanceName:" + instanceName);
-
- DummyProcess process = new DummyProcess(zkConnectString,
- clusterName,
- instanceName,
- cmType,
- cvFileStr,
- delay);
- HelixManager manager = process.start();
-
- try
- {
- Thread.currentThread().join();
- }
- catch (InterruptedException e)
- {
- // ClusterManagerFactory.disconnectManagers(instanceName);
- logger.info("participant:" + instanceName + ", " +
- Thread.currentThread().getName() + " interrupted");
-// if (manager != null)
-// {
-// manager.disconnect();
-// }
- }
- finally
- {
- if (manager != null)
- {
- manager.disconnect();
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/storage/HealthCheckStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/storage/HealthCheckStateModel.java b/helix-core/src/test/java/com/linkedin/helix/mock/storage/HealthCheckStateModel.java
deleted file mode 100644
index 2315350..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/storage/HealthCheckStateModel.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.storage;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.healthcheck.StatHealthReportProvider;
-import com.linkedin.helix.manager.zk.ZKHelixManager;
-import com.linkedin.helix.mock.consumer.ConsumerAdapter;
-import com.linkedin.helix.mock.consumer.RelayConfig;
-import com.linkedin.helix.mock.consumer.RelayConsumer;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.participant.statemachine.StateModel;
-
-public class HealthCheckStateModel extends StateModel
-{
-
- // private Map<Integer, RelayConsumer> relayConsumersMap;
- private RelayConsumer consumer = null;
- private RelayConfig relayConfig;
- private StorageAdapter storage;
- private StatHealthReportProvider _provider;
- //private StatReporterThread _reporterThread;
- private int _reportInterval;
- private Map<String, Vector<String>> _reportValues;
- private CountDownLatch _countdown;
-
- private static Logger logger = Logger.getLogger(HealthCheckStateModel.class);
-
- public HealthCheckStateModel(String stateUnitKey, StorageAdapter storageAdapter, StatHealthReportProvider provider,
- int reportInterval, Map<String, Vector<String>> reportValues, CountDownLatch countdown)
- {
- // relayConsumersMap = new HashMap<Integer,RelayConsumer>();
- storage = storageAdapter;
- //_reporterThread = new StatReporterThread(provider, reportInterval, reportValues, countdown);
- // this.consumerAdapter = consumerAdapter;
- _provider = provider;
- _reportInterval = reportInterval;
- _reportValues = reportValues;
- _countdown = countdown;
- }
-
- public RelayConfig getRelayConfig()
- {
- return relayConfig;
- }
-
- public void setRelayConfig(RelayConfig relayConfig)
- {
- this.relayConfig = relayConfig;
- }
-
- void checkDebug(Message task) throws Exception
- {
- // For debugging purposes
- if ((Boolean) task.getDebug() == true)
- {
- throw new Exception("Exception for debug");
- }
- }
-
- // @transition(to='to',from='from',blah blah..)
- public void onBecomeSlaveFromOffline(Message task, NotificationContext context)
- throws Exception
- {
-
- logger.info("Becoming slave from offline");
-
- checkDebug(task);
-
- String partition = (String) task.getPartitionName();
- String[] pdata = partition.split("\\.");
- String dbName = pdata[0];
-
- // Initializations for the storage node to create right tables, indexes
- // etc.
- storage.init(partition);
- storage.setPermissions(partition, "READONLY");
-
- // start consuming from the relay
- consumer = storage.getNewRelayConsumer(dbName, partition);
- consumer.start();
- // TODO: how do we know we are caught up?
-
- logger.info("Became slave for partition " + partition);
- }
-
- // @transition(to='to',from='from',blah blah..)
- public void onBecomeSlaveFromMaster(Message task, NotificationContext context)
- throws Exception
- {
-
- logger.info("Becoming slave from master");
-
- checkDebug(task);
-
- String partition = (String) task.getPartitionName();
- String[] pdata = partition.split("\\.");
- String dbName = pdata[0];
- storage.setPermissions(partition, "READONLY");
- storage.waitForWrites(partition);
-
- // start consuming from the relay
- consumer = storage.getNewRelayConsumer(dbName, partition);
- consumer.start();
-
- logger.info("Becamse slave for partition " + partition);
- }
-
- // @transition(to='to',from='from',blah blah..)
- public void onBecomeMasterFromSlave(Message task, NotificationContext context)
- throws Exception
- {
- logger.info("Becoming master from slave");
-
- checkDebug(task);
-
- String partition = (String) task.getPartitionName();
-
- // stop consumer and refetch from all so all changes are drained
- consumer.flush(); // blocking call
-
- // TODO: publish the hwm somewhere
- long hwm = consumer.getHwm();
- storage.setHwm(partition, hwm);
- storage.removeConsumer(partition);
- consumer = null;
-
- // set generation in storage
- Integer generationId = (Integer) task.getGeneration();
- storage.setGeneration(partition, generationId);
-
- storage.setPermissions(partition, "READWRITE");
-
- String[] pdata = partition.split("\\.");
- String dbName = pdata[0];
-
- HelixManager manager = context.getManager();
-
- //start the reporting thread
- logger.debug("Starting stats reporting thread");
- StatReporterThread reporterThread = new StatReporterThread(manager, _provider, dbName, partition,
- _reportInterval, _reportValues, _countdown);
- Thread t = new Thread(reporterThread);
- t.run();
- logger.info("Became master for partition " + partition);
- }
-
- // @transition(to='to',from='from',blah blah..)
- public void onBecomeOfflineFromSlave(Message task, NotificationContext context)
- throws Exception
- {
-
- logger.info("Becoming offline from slave");
-
- checkDebug(task);
-
- String partition = (String) task.getPartitionName();
-
- consumer.stop();
- storage.removeConsumer(partition);
- consumer = null;
-
- storage.setPermissions(partition, "OFFLINE");
-
- logger.info("Became offline for partition " + partition);
- }
-
- public static String formStatName(String dbName, String partitionName, String metricName)
- {
- String statName;
- statName = "db"+dbName+".partition"+partitionName+"."+metricName;
- return statName;
- }
-
- public class StatReporterThread implements Runnable
- {
- private HelixManager _manager;
- private int _reportInterval;
- private Map<String, Vector<String>> _reportValues;
- private CountDownLatch _countdown;
- private StatHealthReportProvider _provider;
- private String _dbName;
- private String _partitionName;
-
- public StatReporterThread(HelixManager manager, StatHealthReportProvider provider, String dbName,
- String partitionName, int reportInterval,
- Map<String,Vector<String>> reportValues, CountDownLatch countdown)
- {
- _manager = manager;
- _reportInterval = reportInterval;
- _reportValues = reportValues;
- _countdown = countdown;
- _provider = provider;
- _dbName = dbName;
- _partitionName = partitionName;
- }
-
- @Override
- public void run()
- {
- boolean doneWithStats = false;
- while (!doneWithStats) {
- doneWithStats = true;
- try {
- Thread.sleep(_reportInterval);
- } catch (InterruptedException e) {
- logger.error("Unable to sleep, stats not getting staggered, "+e);
- }
- for (String metricName : _reportValues.keySet()) {
- Vector<String> currValues = _reportValues.get(metricName);
- if (currValues.size() > 0) {
- doneWithStats = false;
- String statName = formStatName(_dbName, _partitionName, metricName);
- String currValue = currValues.remove(0);
- Long currTimestamp = System.currentTimeMillis();
- _provider.writeStat(statName, currValue, String.valueOf(currTimestamp));
- }
- }
- _manager.getHealthReportCollector().transmitHealthReports();
- }
-
- _countdown.countDown();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/storage/HealthCheckStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/storage/HealthCheckStateModelFactory.java b/helix-core/src/test/java/com/linkedin/helix/mock/storage/HealthCheckStateModelFactory.java
deleted file mode 100644
index c8031b7..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/storage/HealthCheckStateModelFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.storage;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-
-public class HealthCheckStateModelFactory extends StateModelFactory
-{
- private static Logger logger = Logger
- .getLogger(HealthCheckStateModelFactory.class);
-
- private StorageAdapter storageAdapter;
-
- // private ConsumerAdapter consumerAdapter;
-
- public HealthCheckStateModelFactory(StorageAdapter storage)
- {
- storageAdapter = storage;
- }
-
- HealthCheckStateModel getStateModelForPartition(Integer partition)
- {
- return null;
- }
-
- @Override
- public StateModel createNewStateModel(String stateUnitKey)
- {
- logger.info("HealthCheckStateModelFactory.getStateModel()");
- //TODO: fix these parameters
- return new HealthCheckStateModel(stateUnitKey, storageAdapter, null, 0, null, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockEspressoHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockEspressoHealthReportProvider.java b/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockEspressoHealthReportProvider.java
deleted file mode 100644
index 6e71a5d..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockEspressoHealthReportProvider.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.storage;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.linkedin.helix.alerts.StatsHolder;
-import com.linkedin.helix.healthcheck.HealthReportProvider;
-
-public class MockEspressoHealthReportProvider extends HealthReportProvider {
-
- private final String _reportName = "RestQueryStats";
- private HashMap<String, Map<String,String>> _statMap;
- private final String DB_NAME = "DBName";
-
- public MockEspressoHealthReportProvider()
- {
- super();
- _statMap = new HashMap<String, Map<String,String>>();
- }
-
- public String buildMapKey(String dbName)
- {
- return _reportName+"@"+DB_NAME+"="+dbName;
- }
-
- public void setStat(String dbName, String statName, String statVal)
- {
- String currTime = String.valueOf(System.currentTimeMillis());
- setStat(dbName, statName, statVal, currTime);
- }
-
- /*
- * This version takes a fixed timestamp to ease with testing
- */
- public void setStat(String dbName, String statName, String statVal, String timestamp)
- {
- String key = buildMapKey(dbName);
- Map<String, String> dbStatMap = _statMap.get(key);
- if (dbStatMap == null) {
- dbStatMap = new HashMap<String,String>();
- _statMap.put(key, dbStatMap);
- }
- dbStatMap.put(statName, statVal);
- dbStatMap.put(StatsHolder.TIMESTAMP_NAME, timestamp);
- }
-
- @Override
- public Map<String, String> getRecentHealthReport() {
- return null;
- }
-
- @Override
- public Map<String, Map<String, String>> getRecentPartitionHealthReport() {
- return _statMap;
- }
-
- @Override
- public void resetStats() {
- _statMap.clear();
- }
-
- public String getReportName()
- {
- return _reportName;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockHealthReportParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockHealthReportParticipant.java b/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockHealthReportParticipant.java
deleted file mode 100644
index c69499b..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockHealthReportParticipant.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.storage;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.healthcheck.HealthReportProvider;
-
-public class MockHealthReportParticipant
-{
- private static final Logger LOG =
- Logger.getLogger(MockHealthReportParticipant.class);
- public static final String zkServer = "zkSvr";
- public static final String cluster = "cluster";
- public static final String host = "host";
- public static final String port = "port";
- public static final String help = "help";
-
- static class MockHealthReportProvider extends HealthReportProvider
- {
- private final String _reportName = "MockRestQueryStats";
- private final Map<String, Map<String, String>> _mockHealthReport;
-
- public MockHealthReportProvider()
- {
- _mockHealthReport = new HashMap<String, Map<String, String>>();
-
- Map<String, String> reportMap = new HashMap<String, String>();
- _mockHealthReport.put("MockRestQueryStats@DBName=BizProfile", reportMap);
-
- reportMap.put("MeanMysqlLatency", "2.132700625");
- reportMap.put("95PercentileLatencyLucene", "108.40825525");
- reportMap.put("99PercentileLatencyMysql", "9.369827");
- reportMap.put("99PercentileLatencyServer", "167.714208");
- reportMap.put("95PercentileLatencyMysqlPool", "8.03621375");
- reportMap.put("95PercentileLatencyServer", "164.68374265");
- reportMap.put("MinLuceneLatency", "1.765908");
- reportMap.put("MaxServerLatency", "167.714208");
- reportMap.put("MeanLuceneLatency", "16.107599458333336");
- reportMap.put("CollectorName", "RestQueryStats");
- reportMap.put("MeanLucenePoolLatency", "8.120545333333332");
- reportMap.put("99PercentileLatencyLucenePool", "65.930564");
- reportMap.put("MinServerLatency", "0.425272");
- reportMap.put("IndexStoreMismatchCount", "0");
- reportMap.put("ErrorCount", "0");
- reportMap.put("MeanMysqlPoolLatency", "1.0704102916666667");
- reportMap.put("MinLucenePoolLatency", "0.008189");
- reportMap.put("MinMysqlLatency", "0.709691");
- reportMap.put("MaxMysqlPoolLatency", "8.606973");
- reportMap.put("99PercentileLatencyMysqlPool", "8.606973");
- reportMap.put("MinMysqlPoolLatency", "0.091883");
- reportMap.put("MaxLucenePoolLatency", "65.930564");
- reportMap.put("99PercentileLatencyLucene", "111.78799");
- reportMap.put("MaxMysqlLatency", "9.369827");
- reportMap.put("TimeStamp", "1332895048143");
- reportMap.put("MeanConcurrencyLevel", "1.9");
- reportMap.put("95PercentileLatencyMysql", "8.96594875");
- reportMap.put("QueryStartCount", "0");
- reportMap.put("95PercentileLatencyLucenePool", "63.518656500000006");
- reportMap.put("MeanServerLatency", "39.5451532");
- reportMap.put("MaxLuceneLatency", "111.78799");
- reportMap.put("QuerySuccessCount", "0");
- }
-
- @Override
- public Map<String, String> getRecentHealthReport()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void resetStats()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public Map<String, Map<String, String>> getRecentPartitionHealthReport()
- {
- // tweak: randomly change the last digit
- for (String key1 : _mockHealthReport.keySet())
- {
- Map<String, String> reportMap = _mockHealthReport.get(key1);
- for (String key2 : reportMap.keySet())
- {
- String value = reportMap.get(key2);
- String lastDigit = "" + new Random().nextInt(10);
- value = value.substring(0, value.length() - 1) + lastDigit;
- reportMap.put(key2, value);
- }
- }
-
- return _mockHealthReport;
- }
-
- @Override
- public String getReportName()
- {
- return _reportName;
- }
- }
-
- static class MockHealthReportJob implements MockJobIntf
- {
-
- @Override
- public void doPreConnectJob(HelixManager manager)
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void doPostConnectJob(HelixManager manager)
- {
- // TODO Auto-generated method stub
- manager.getHealthReportCollector()
- .addHealthReportProvider(new MockHealthReportProvider());
-
-// // set property store path for perf test
-// final String setPath = "/TEST_PERF/set";
-// final String updatePath = "/TEST_PERF/update";
-// manager.getHelixPropertyStore().create(setPath, new ZNRecord(setPath), BaseDataAccessor.Option.PERSISTENT);
-// manager.getHelixPropertyStore().set(updatePath, new ZNRecord(updatePath), BaseDataAccessor.Option.PERSISTENT);
- }
-
- }
-
- // hack OptionBuilder is not thread safe
- @SuppressWarnings("static-access")
- synchronized private static Options constructCommandLineOptions()
- {
- Option helpOption =
- OptionBuilder.withLongOpt(help)
- .withDescription("Prints command-line options info")
- .create();
-
- Option clusterOption =
- OptionBuilder.withLongOpt(cluster)
- .withDescription("Provide cluster name")
- .create();
- clusterOption.setArgs(1);
- clusterOption.setRequired(true);
- clusterOption.setArgName("Cluster name (Required)");
-
- Option hostOption =
- OptionBuilder.withLongOpt(host).withDescription("Provide host name").create();
- hostOption.setArgs(1);
- hostOption.setRequired(true);
- hostOption.setArgName("Host name (Required)");
-
- Option portOption =
- OptionBuilder.withLongOpt(port).withDescription("Provide host port").create();
- portOption.setArgs(1);
- portOption.setRequired(true);
- portOption.setArgName("Host port (Required)");
-
- Option zkServerOption =
- OptionBuilder.withLongOpt(zkServer)
- .withDescription("Provide zookeeper address")
- .create();
- zkServerOption.setArgs(1);
- zkServerOption.setRequired(true);
- zkServerOption.setArgName("Zookeeper server address(Required)");
-
- Options options = new Options();
- options.addOption(helpOption);
- options.addOption(clusterOption);
- options.addOption(hostOption);
- options.addOption(portOption);
- options.addOption(zkServerOption);
-
- return options;
- }
-
- public static void printUsage(Options cliOptions)
- {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.printHelp("java " + MockHealthReportParticipant.class.getName(),
- cliOptions);
- }
-
- public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception
- {
- CommandLineParser cliParser = new GnuParser();
- Options cliOptions = constructCommandLineOptions();
-
- try
- {
-
- return cliParser.parse(cliOptions, cliArgs);
- }
- catch (ParseException pe)
- {
- System.err.println("CommandLineClient: failed to parse command-line options: "
- + pe.toString());
- printUsage(cliOptions);
- System.exit(1);
- }
- return null;
- }
-
- // NOT working for kill -9, working for kill -2/-15
- static class MockHealthReportParticipantShutdownHook extends Thread
- {
- final MockParticipant _participant;
-
- MockHealthReportParticipantShutdownHook(MockParticipant participant)
- {
- _participant = participant;
- }
-
- @Override
- public void run()
- {
- LOG.info("MockHealthReportParticipantShutdownHook invoked");
- _participant.syncStop();
- }
- }
-
- public static void main(String[] args) throws Exception
- {
- CommandLine cmd = processCommandLineArgs(args);
- String zkConnectStr = cmd.getOptionValue(zkServer);
- String clusterName = cmd.getOptionValue(cluster);
- String hostStr = cmd.getOptionValue(host);
- String portStr = cmd.getOptionValue(port);
-
- String instanceName = hostStr + "_" + portStr;
-
- MockParticipant participant =
- new MockParticipant(clusterName,
- instanceName,
- zkConnectStr,
- null, // new StoreAccessDiffNodeTransition(), // new StoreAccessOneNodeTransition(),
- new MockHealthReportJob());
- Runtime.getRuntime()
- .addShutdownHook(new MockHealthReportParticipantShutdownHook(participant));
-
- // Espresso_driver.py will consume this
- System.out.println("MockHealthReportParticipant process started, instanceName: "
- + instanceName);
-
- participant.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockJobIntf.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockJobIntf.java b/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockJobIntf.java
deleted file mode 100644
index 6fabe1e..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockJobIntf.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.storage;
-
-import com.linkedin.helix.HelixManager;
-
-public interface MockJobIntf
-{
- public void doPreConnectJob(HelixManager manager);
- public void doPostConnectJob(HelixManager manager);
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockParticipant.java b/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockParticipant.java
deleted file mode 100644
index fe9cd57..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockParticipant.java
+++ /dev/null
@@ -1,615 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.storage;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixManagerFactory;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.mock.storage.DummyProcess.DummyLeaderStandbyStateModelFactory;
-import com.linkedin.helix.mock.storage.DummyProcess.DummyOnlineOfflineStateModelFactory;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.participant.StateMachineEngine;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-import com.linkedin.helix.participant.statemachine.StateModelInfo;
-import com.linkedin.helix.participant.statemachine.Transition;
-import com.linkedin.helix.store.zk.ZkHelixPropertyStore;
-
-public class MockParticipant extends Thread
-{
- private static Logger LOG =
- Logger.getLogger(MockParticipant.class);
- private final String _clusterName;
- private final String _instanceName;
- // private final String _zkAddr;
-
- private final CountDownLatch _startCountDown = new CountDownLatch(1);
- private final CountDownLatch _stopCountDown = new CountDownLatch(1);
- private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-
- private final HelixManager _manager;
- private final StateModelFactory _msModelFactory;
- private final MockJobIntf _job;
-
- // mock master-slave state model
- @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" })
- public static class MockMSStateModel extends StateModel
- {
- protected MockTransition _transition;
-
- public MockMSStateModel(MockTransition transition)
- {
- _transition = transition;
- }
-
- public void setTransition(MockTransition transition)
- {
- _transition = transition;
- }
-
- @Transition(to = "SLAVE", from = "OFFLINE")
- public void onBecomeSlaveFromOffline(Message message, NotificationContext context) throws InterruptedException
- {
- LOG.info("Become SLAVE from OFFLINE");
- if (_transition != null)
- {
- _transition.doTransition(message, context);
-
- }
- }
-
- @Transition(to = "MASTER", from = "SLAVE")
- public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException
- {
- LOG.info("Become MASTER from SLAVE");
- if (_transition != null)
- {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "SLAVE", from = "MASTER")
- public void onBecomeSlaveFromMaster(Message message, NotificationContext context) throws InterruptedException
- {
- LOG.info("Become SLAVE from MASTER");
- if (_transition != null)
- {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "OFFLINE", from = "SLAVE")
- public void onBecomeOfflineFromSlave(Message message, NotificationContext context) throws InterruptedException
- {
- LOG.info("Become OFFLINE from SLAVE");
- if (_transition != null)
- {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "DROPPED", from = "OFFLINE")
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context) throws InterruptedException
- {
- LOG.info("Become DROPPED from OFFLINE");
- if (_transition != null)
- {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "OFFLINE", from = "ERROR")
- public void onBecomeOfflineFromError(Message message, NotificationContext context) throws InterruptedException
- {
- LOG.info("Become OFFLINE from ERROR");
- // System.err.println("Become OFFLINE from ERROR");
- if (_transition != null)
- {
- _transition.doTransition(message, context);
- }
- }
-
- @Override
- public void reset()
- {
- LOG.info("Default MockMSStateModel.reset() invoked");
- if (_transition != null)
- {
- _transition.doReset();
- }
- }
- }
-
- // mock master slave state model factory
- public static class MockMSModelFactory extends StateModelFactory<MockMSStateModel>
- {
- private final MockTransition _transition;
-
- public MockMSModelFactory()
- {
- this(null);
- }
-
- public MockMSModelFactory(MockTransition transition)
- {
- _transition = transition;
- }
-
- public void setTrasition(MockTransition transition)
- {
- Map<String, MockMSStateModel> stateModelMap = getStateModelMap();
- for (MockMSStateModel stateModel : stateModelMap.values())
- {
- stateModel.setTransition(transition);
- }
- }
-
- @Override
- public MockMSStateModel createNewStateModel(String partitionKey)
- {
- MockMSStateModel model = new MockMSStateModel(_transition);
-
- return model;
- }
- }
-
- // mock STORAGE_DEFAULT_SM_SCHEMATA state model
- @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "DROPPED", "ERROR" })
- public class MockSchemataStateModel extends StateModel
- {
- @Transition(to = "MASTER", from = "OFFLINE")
- public void onBecomeMasterFromOffline(Message message, NotificationContext context)
- {
- LOG.info("Become MASTER from OFFLINE");
- }
-
- @Transition(to = "OFFLINE", from = "MASTER")
- public void onBecomeOfflineFromMaster(Message message, NotificationContext context)
- {
- LOG.info("Become OFFLINE from MASTER");
- }
-
- @Transition(to = "DROPPED", from = "OFFLINE")
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
- {
- LOG.info("Become DROPPED from OFFLINE");
- }
-
- @Transition(to = "OFFLINE", from = "ERROR")
- public void onBecomeOfflineFromError(Message message, NotificationContext context)
- {
- LOG.info("Become OFFLINE from ERROR");
- }
- }
-
- // mock Bootstrap state model
- @StateModelInfo(initialState = "OFFLINE", states = { "ONLINE", "BOOTSTRAP", "OFFLINE",
- "IDLE" })
- public static class MockBootstrapStateModel extends StateModel
- {
- // Overwrite the default value of intial state
- MockBootstrapStateModel()
- {
- _currentState = "IDLE";
- }
-
- @Transition(to = "OFFLINE", from = "IDLE")
- public void onBecomeOfflineFromIdle(Message message, NotificationContext context)
- {
- LOG.info("Become OFFLINE from IDLE");
- }
-
- @Transition(to = "BOOTSTRAP", from = "OFFLINE")
- public void onBecomeBootstrapFromOffline(Message message, NotificationContext context)
- {
- LOG.info("Become BOOTSTRAP from OFFLINE");
- }
-
- @Transition(to = "ONLINE", from = "BOOSTRAP")
- public void onBecomeOnlineFromBootstrap(Message message, NotificationContext context)
- {
- LOG.info("Become ONLINE from BOOTSTRAP");
- }
-
- @Transition(to = "OFFLINE", from = "ONLINE")
- public void onBecomeOfflineFromOnline(Message message, NotificationContext context)
- {
- LOG.info("Become OFFLINE from ONLINE");
- }
- }
-
- // mock STORAGE_DEFAULT_SM_SCHEMATA state model factory
- public class MockSchemataModelFactory extends StateModelFactory<MockSchemataStateModel>
- {
- @Override
- public MockSchemataStateModel createNewStateModel(String partitionKey)
- {
- MockSchemataStateModel model = new MockSchemataStateModel();
- return model;
- }
- }
-
- // mock Bootstrap state model factory
- public static class MockBootstrapModelFactory extends
- StateModelFactory<MockBootstrapStateModel>
- {
- @Override
- public MockBootstrapStateModel createNewStateModel(String partitionKey)
- {
- MockBootstrapStateModel model = new MockBootstrapStateModel();
- return model;
- }
- }
-
- // simulate error transition
- public static class ErrTransition extends MockTransition
- {
- private final Map<String, Set<String>> _errPartitions;
-
- public ErrTransition(Map<String, Set<String>> errPartitions)
- {
- if (errPartitions != null)
- {
- // change key to upper case
- _errPartitions = new HashMap<String, Set<String>>();
- for (String key : errPartitions.keySet())
- {
- String upperKey = key.toUpperCase();
- _errPartitions.put(upperKey, errPartitions.get(key));
- }
- }
- else
- {
- _errPartitions = Collections.emptyMap();
- }
- }
-
- @Override
- public void doTransition(Message message, NotificationContext context)
- {
- String fromState = message.getFromState();
- String toState = message.getToState();
- String partition = message.getPartitionName();
-
- String key = (fromState + "-" + toState).toUpperCase();
- if (_errPartitions.containsKey(key) && _errPartitions.get(key).contains(partition))
- {
- String errMsg =
- "IGNORABLE: test throw exception for " + partition + " transit from "
- + fromState + " to " + toState;
- throw new RuntimeException(errMsg);
- }
- }
- }
-
- // simulate long transition
- public static class SleepTransition extends MockTransition
- {
- private final long _delay;
-
- public SleepTransition(long delay)
- {
- _delay = delay > 0 ? delay : 0;
- }
-
- @Override
- public void doTransition(Message message, NotificationContext context) throws InterruptedException
- {
- Thread.sleep(_delay);
-
- }
- }
-
- // simulate access property store and update one znode
- public static class StoreAccessOneNodeTransition extends MockTransition
- {
- @Override
- public void doTransition(Message message, NotificationContext context) throws InterruptedException
- {
- HelixManager manager = context.getManager();
- ZkHelixPropertyStore<ZNRecord> store = manager.getHelixPropertyStore();
- final String setPath = "/TEST_PERF/set";
- final String updatePath = "/TEST_PERF/update";
- final String key = message.getPartitionName();
- try
- {
- // get/set once
- ZNRecord record = null;
- try
- {
- record = store.get(setPath, null, 0);
- }
- catch (ZkNoNodeException e)
- {
- record = new ZNRecord(setPath);
- }
- record.setSimpleField("setTimestamp", "" + System.currentTimeMillis());
- store.set(setPath, record, AccessOption.PERSISTENT);
-
- // update once
- store.update(updatePath, new DataUpdater<ZNRecord>()
- {
-
- @Override
- public ZNRecord update(ZNRecord currentData)
- {
- if (currentData == null)
- {
- currentData = new ZNRecord(updatePath);
- }
- currentData.setSimpleField(key, "" + System.currentTimeMillis());
-
- return currentData;
- }
-
- }, AccessOption.PERSISTENT);
- }
- catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
- }
-
- // simulate access property store and update different znodes
- public static class StoreAccessDiffNodeTransition extends MockTransition
- {
- @Override
- public void doTransition(Message message, NotificationContext context) throws InterruptedException
- {
- HelixManager manager = context.getManager();
- ZkHelixPropertyStore<ZNRecord> store = manager.getHelixPropertyStore();
- final String setPath = "/TEST_PERF/set/" + message.getPartitionName();
- final String updatePath = "/TEST_PERF/update/" + message.getPartitionName();
- // final String key = message.getPartitionName();
- try
- {
- // get/set once
- ZNRecord record = null;
- try
- {
- record = store.get(setPath, null, 0);
- }
- catch (ZkNoNodeException e)
- {
- record = new ZNRecord(setPath);
- }
- record.setSimpleField("setTimestamp", "" + System.currentTimeMillis());
- store.set(setPath, record, AccessOption.PERSISTENT);
-
- // update once
- store.update(updatePath, new DataUpdater<ZNRecord>()
- {
-
- @Override
- public ZNRecord update(ZNRecord currentData)
- {
- if (currentData == null)
- {
- currentData = new ZNRecord(updatePath);
- }
- currentData.setSimpleField("updateTimestamp", "" + System.currentTimeMillis());
-
- return currentData;
- }
-
- }, AccessOption.PERSISTENT);
- }
- catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
- }
-
- public MockParticipant(String clusterName, String instanceName, String zkAddr) throws Exception
- {
- this(clusterName, instanceName, zkAddr, null, null);
- }
-
- public MockParticipant(String clusterName,
- String instanceName,
- String zkAddr,
- MockTransition transition) throws Exception
- {
- this(clusterName, instanceName, zkAddr, transition, null);
- }
-
- public MockParticipant(String clusterName,
- String instanceName,
- String zkAddr,
- MockTransition transition,
- MockJobIntf job) throws Exception
- {
- _clusterName = clusterName;
- _instanceName = instanceName;
- _msModelFactory = new MockMSModelFactory(transition);
-
- _manager =
- HelixManagerFactory.getZKHelixManager(_clusterName,
- _instanceName,
- InstanceType.PARTICIPANT,
- zkAddr);
- _job = job;
- }
-
- public MockParticipant(StateModelFactory factory,
- String clusterName,
- String instanceName,
- String zkAddr,
- MockJobIntf job) throws Exception
- {
- _clusterName = clusterName;
- _instanceName = instanceName;
- _msModelFactory = factory;
-
- _manager =
- HelixManagerFactory.getZKHelixManager(_clusterName,
- _instanceName,
- InstanceType.PARTICIPANT,
- zkAddr);
- _job = job;
- }
-
- public StateModelFactory getStateModelFactory()
- {
- return _msModelFactory;
- }
-
- public MockParticipant(HelixManager manager, MockTransition transition)
- {
- _clusterName = manager.getClusterName();
- _instanceName = manager.getInstanceName();
- _manager = manager;
-
- _msModelFactory = new MockMSModelFactory(transition);
- _job = null;
- }
-
- public void setTransition(MockTransition transition)
- {
- if (_msModelFactory instanceof MockMSModelFactory)
- {
- ((MockMSModelFactory) _msModelFactory).setTrasition(transition);
- }
- }
-
- public HelixManager getManager()
- {
- return _manager;
- }
-
- public String getInstanceName()
- {
- return _instanceName;
- }
-
- public String getClusterName()
- {
- return _clusterName;
- }
-
- public void syncStop()
- {
- _stopCountDown.countDown();
- try
- {
- _waitStopFinishCountDown.await();
- }
- catch (InterruptedException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- // synchronized (_manager)
- // {
- // _manager.disconnect();
- // }
- }
-
- public void syncStart()
- {
- super.start();
- try
- {
- _startCountDown.await();
- }
- catch (InterruptedException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- @Override
- public void run()
- {
- try
- {
- StateMachineEngine stateMach = _manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
-
- DummyLeaderStandbyStateModelFactory lsModelFactory =
- new DummyLeaderStandbyStateModelFactory(10);
- DummyOnlineOfflineStateModelFactory ofModelFactory =
- new DummyOnlineOfflineStateModelFactory(10);
- stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
- stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
-
- MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
- stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
- // MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
- // stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
-
- if (_job != null)
- {
- _job.doPreConnectJob(_manager);
- }
-
- _manager.connect();
- _startCountDown.countDown();
-
- if (_job != null)
- {
- _job.doPostConnectJob(_manager);
- }
-
- _stopCountDown.await();
- }
- catch (InterruptedException e)
- {
- String msg =
- "participant: " + _instanceName + ", " + Thread.currentThread().getName()
- + " is interrupted";
- LOG.info(msg);
- System.err.println(msg);
- }
- catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- finally
- {
- _startCountDown.countDown();
-
- synchronized (_manager)
- {
- _manager.disconnect();
- }
- _waitStopFinishCountDown.countDown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockStorageProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockStorageProcess.java b/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockStorageProcess.java
deleted file mode 100644
index de6d17e..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/mock/storage/MockStorageProcess.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.mock.storage;
-
-import org.apache.log4j.*;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-import com.linkedin.helix.manager.zk.ZKDataAccessor;
-import com.linkedin.helix.mock.consumer.ConsumerAdapter;
-import com.linkedin.helix.tools.ClusterSetup;
-
-public class MockStorageProcess
-{
- static Logger logger = Logger.getLogger(MockStorageProcess.class);
-
- public static final String zkServer = "zkSvr";
- public static final String cluster = "cluster";
- public static final String hostAddress = "host";
- public static final String hostPort = "port";
- public static final String relayCluster = "relayCluster";
- public static final String help = "help";
-
- private StorageAdapter storageAdapter;
- private ConsumerAdapter consumerAdapter;
-
- boolean put(Object key, Object val)
- {
- Integer partitionId = 1;
- storageAdapter.isMasterForPartition(partitionId);
- return true;
- }
-
- Object get(Object key)
- {
- Integer partitionId = 1;
- if (storageAdapter.isMasterForPartition(partitionId)
- || storageAdapter.isReplicaForPartition(partitionId))
- {
- return new String("val for " + key);
- }
- return null;
- }
-
- void start(String instanceName, String zkServerAddress, String clusterName,
- String relayClusterName) throws Exception
- {
- storageAdapter = new StorageAdapter(instanceName, zkServerAddress,
- clusterName, relayClusterName);
- storageAdapter.start();
- }
-
- @SuppressWarnings("static-access")
- private static Options constructCommandLineOptions()
- {
- Option helpOption = OptionBuilder.withLongOpt(help)
- .withDescription("Prints command-line options info").create();
-
- Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
- .withDescription("Provide zookeeper address").create();
- zkServerOption.setArgs(1);
- zkServerOption.setRequired(true);
- zkServerOption.setArgName("ZookeeperServerAddress(Required)");
-
- Option clusterOption = OptionBuilder.withLongOpt(cluster)
- .withDescription("Provide cluster name").create();
- clusterOption.setArgs(1);
- clusterOption.setRequired(true);
- clusterOption.setArgName("Cluster name (Required)");
-
- Option hostOption = OptionBuilder.withLongOpt(hostAddress)
- .withDescription("Provide host name").create();
- hostOption.setArgs(1);
- hostOption.setRequired(true);
- hostOption.setArgName("Host name (Required)");
-
- Option portOption = OptionBuilder.withLongOpt(hostPort)
- .withDescription("Provide host port").create();
- portOption.setArgs(1);
- portOption.setRequired(true);
- portOption.setArgName("Host port (Required)");
-
- Option relayClusterOption = OptionBuilder.withLongOpt(relayCluster)
- .withDescription("Provide relay cluster name").create();
- relayClusterOption.setArgs(1);
- relayClusterOption.setRequired(true);
- relayClusterOption.setArgName("Relay cluster name (Required)");
-
- Options options = new Options();
- options.addOption(helpOption);
- options.addOption(zkServerOption);
- options.addOption(clusterOption);
- options.addOption(hostOption);
- options.addOption(portOption);
- options.addOption(relayClusterOption);
- return options;
- }
-
- public static void printUsage(Options cliOptions)
- {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
- }
-
- public static CommandLine processCommandLineArgs(String[] cliArgs)
- throws Exception
- {
- CommandLineParser cliParser = new GnuParser();
- Options cliOptions = constructCommandLineOptions();
- CommandLine cmd = null;
-
- try
- {
- return cliParser.parse(cliOptions, cliArgs);
- } catch (ParseException pe)
- {
- System.err
- .println("CommandLineClient: failed to parse command-line options: "
- + pe.toString());
- printUsage(cliOptions);
- System.exit(1);
- }
- return null;
- }
-
- public static void main(String[] args) throws Exception
- {
- String clusterName = "storage-cluster";
- String relayClusterName = "relay-cluster";
- String zkServerAddress = "localhost:2181";
- String host = "localhost";
- int port = 8900;
- if (args.length > 0)
- {
- CommandLine cmd = processCommandLineArgs(args);
- zkServerAddress = cmd.getOptionValue(zkServer);
- clusterName = cmd.getOptionValue(cluster);
- relayClusterName = cmd.getOptionValue(relayCluster);
- host = cmd.getOptionValue(hostAddress);
- String portString = cmd.getOptionValue(hostPort);
- port = Integer.parseInt(portString);
- }
- // Espresso_driver.py will consume this
- System.out.println("Mock storage started");
- MockStorageProcess process = new MockStorageProcess();
- process.start(host + "_" + port, zkServerAddress, clusterName,
- relayClusterName);
-
- Thread.sleep(10000000);
- }
-}