You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC
[39/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/DummyProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/DummyProcess.java b/helix-core/src/test/java/org/apache/helix/mock/storage/DummyProcess.java
new file mode 100644
index 0000000..957ff2a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/DummyProcess.java
@@ -0,0 +1,546 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import java.io.File;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.log4j.Logger;
+
+
+public class DummyProcess
+{
+ private static final Logger logger = Logger.getLogger(DummyProcess.class);
+ public static final String zkServer = "zkSvr";
+ public static final String cluster = "cluster";
+ public static final String hostAddress = "host";
+ public static final String hostPort = "port";
+ public static final String relayCluster = "relayCluster";
+ public static final String help = "help";
+ public static final String clusterViewFile = "clusterViewFile";
+ public static final String transDelay = "transDelay";
+ public static final String helixManagerType = "helixManagerType";
+// public static final String rootNamespace = "rootNamespace";
+
+ private final String _zkConnectString;
+ private final String _clusterName;
+ private final String _instanceName;
+ private DummyStateModelFactory stateModelFactory;
+// private StateMachineEngine genericStateMachineHandler;
+
+ private final FilePropertyStore<ZNRecord> _fileStore;
+
+ private final String _clusterViewFile;
+ private int _transDelayInMs = 0;
+ private final String _clusterMangerType;
+
+ public DummyProcess(String zkConnectString,
+ String clusterName,
+ String instanceName,
+ String clusterMangerType,
+ String clusterViewFile,
+ int delay)
+ {
+ this(zkConnectString, clusterName, instanceName, "zk", clusterViewFile, delay, null);
+ }
+
+ public DummyProcess(String zkConnectString,
+ String clusterName,
+ String instanceName,
+ String clusterMangerType,
+ String clusterViewFile,
+ int delay,
+ FilePropertyStore<ZNRecord> fileStore)
+ {
+ _zkConnectString = zkConnectString;
+ _clusterName = clusterName;
+ _instanceName = instanceName;
+ _clusterViewFile = clusterViewFile;
+ _clusterMangerType = clusterMangerType;
+ _transDelayInMs = delay > 0 ? delay : 0;
+ _fileStore = fileStore;
+ }
+
+ static void sleep(long transDelay)
+ {
+ try
+ {
+ if (transDelay > 0)
+ {
+ Thread.sleep(transDelay);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public HelixManager start() throws Exception
+ {
+ HelixManager manager = null;
+ // zk cluster manager
+ if (_clusterMangerType.equalsIgnoreCase("zk"))
+ {
+ manager = HelixManagerFactory.getZKHelixManager(_clusterName,
+ _instanceName,
+ InstanceType.PARTICIPANT,
+ _zkConnectString);
+ }
+ // static file cluster manager
+ else if (_clusterMangerType.equalsIgnoreCase("static-file"))
+ {
+ manager = HelixManagerFactory.getStaticFileHelixManager(_clusterName,
+ _instanceName,
+ InstanceType.PARTICIPANT,
+ _clusterViewFile);
+
+ }
+ // dynamic file cluster manager
+ else if (_clusterMangerType.equalsIgnoreCase("dynamic-file"))
+ {
+ manager = HelixManagerFactory.getDynamicFileHelixManager(_clusterName,
+ _instanceName,
+ InstanceType.PARTICIPANT,
+ _fileStore);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unsupported cluster manager type:" + _clusterMangerType);
+ }
+
+ stateModelFactory = new DummyStateModelFactory(_transDelayInMs);
+ DummyLeaderStandbyStateModelFactory stateModelFactory1 = new DummyLeaderStandbyStateModelFactory(_transDelayInMs);
+ DummyOnlineOfflineStateModelFactory stateModelFactory2 = new DummyOnlineOfflineStateModelFactory(_transDelayInMs);
+// genericStateMachineHandler = new StateMachineEngine();
+ StateMachineEngine stateMach = manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+ stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory1);
+ stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
+
+ manager.connect();
+// manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), genericStateMachineHandler);
+ return manager;
+ }
+
+ public static class DummyStateModelFactory extends StateModelFactory<DummyStateModel>
+ {
+ int _delay;
+
+ public DummyStateModelFactory(int delay)
+ {
+ _delay = delay;
+ }
+
+ @Override
+ public DummyStateModel createNewStateModel(String stateUnitKey)
+ {
+ DummyStateModel model = new DummyStateModel();
+ model.setDelay(_delay);
+ return model;
+ }
+ }
+
+ public static class DummyLeaderStandbyStateModelFactory extends StateModelFactory<DummyLeaderStandbyStateModel>
+ {
+ int _delay;
+
+ public DummyLeaderStandbyStateModelFactory(int delay)
+ {
+ _delay = delay;
+ }
+
+ @Override
+ public DummyLeaderStandbyStateModel createNewStateModel(String stateUnitKey)
+ {
+ DummyLeaderStandbyStateModel model = new DummyLeaderStandbyStateModel();
+ model.setDelay(_delay);
+ return model;
+ }
+ }
+
+ public static class DummyOnlineOfflineStateModelFactory extends StateModelFactory<DummyOnlineOfflineStateModel>
+ {
+ int _delay;
+
+ public DummyOnlineOfflineStateModelFactory(int delay)
+ {
+ _delay = delay;
+ }
+
+ @Override
+ public DummyOnlineOfflineStateModel createNewStateModel(String stateUnitKey)
+ {
+ DummyOnlineOfflineStateModel model = new DummyOnlineOfflineStateModel();
+ model.setDelay(_delay);
+ return model;
+ }
+ }
+ public static class DummyStateModel extends StateModel
+ {
+ int _transDelay = 0;
+
+ public void setDelay(int delay)
+ {
+ _transDelay = delay > 0 ? delay : 0;
+ }
+
+ public void onBecomeSlaveFromOffline(Message message,
+ NotificationContext context)
+ {
+ String db = message.getPartitionName();
+ String instanceName = context.getManager().getInstanceName();
+ DummyProcess.sleep(_transDelay);
+
+ logger.info("DummyStateModel.onBecomeSlaveFromOffline(), instance:" + instanceName
+ + ", db:" + db);
+ }
+
+ public void onBecomeSlaveFromMaster(Message message,
+ NotificationContext context)
+ {
+ DummyProcess.sleep(_transDelay);
+
+ logger.info("DummyStateModel.onBecomeSlaveFromMaster()");
+
+ }
+
+ public void onBecomeMasterFromSlave(Message message,
+ NotificationContext context)
+ {
+ DummyProcess.sleep(_transDelay);
+
+ logger.info("DummyStateModel.onBecomeMasterFromSlave()");
+
+ }
+
+ public void onBecomeOfflineFromSlave(Message message,
+ NotificationContext context)
+ {
+ DummyProcess.sleep(_transDelay);
+
+ logger.info("DummyStateModel.onBecomeOfflineFromSlave()");
+
+ }
+
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+ {
+ DummyProcess.sleep(_transDelay);
+
+ logger.info("DummyStateModel.onBecomeDroppedFromOffline()");
+
+ }
+ }
+
+
+ public static class DummyOnlineOfflineStateModel extends StateModel
+ {
+ int _transDelay = 0;
+
+ public void setDelay(int delay)
+ {
+ _transDelay = delay > 0 ? delay : 0;
+ }
+
+ public void onBecomeOnlineFromOffline(Message message,
+ NotificationContext context)
+ {
+ String db = message.getPartitionName();
+ String instanceName = context.getManager().getInstanceName();
+ DummyProcess.sleep(_transDelay);
+
+ logger.info("DummyStateModel.onBecomeOnlineFromOffline(), instance:" + instanceName
+ + ", db:" + db);
+ }
+
+ public void onBecomeOfflineFromOnline(Message message,
+ NotificationContext context)
+ {
+ DummyProcess.sleep(_transDelay);
+
+ logger.info("DummyStateModel.onBecomeOfflineFromOnline()");
+
+ }
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+ {
+ DummyProcess.sleep(_transDelay);
+
+ logger.info("DummyStateModel.onBecomeDroppedFromOffline()");
+
+ }
+ }
+
+ public static class DummyLeaderStandbyStateModel extends StateModel
+ {
+ int _transDelay = 0;
+
+ public void setDelay(int delay)
+ {
+ _transDelay = delay > 0 ? delay : 0;
+ }
+
+ public void onBecomeLeaderFromStandby(Message message,
+ NotificationContext context)
+ {
+ String db = message.getPartitionName();
+ String instanceName = context.getManager().getInstanceName();
+ DummyProcess.sleep(_transDelay);
+ logger.info("DummyLeaderStandbyStateModel.onBecomeLeaderFromStandby(), instance:" + instanceName
+ + ", db:" + db);
+ }
+
+ public void onBecomeStandbyFromLeader(Message message,
+ NotificationContext context)
+ {
+ DummyProcess.sleep(_transDelay);
+
+ logger.info("DummyLeaderStandbyStateModel.onBecomeStandbyFromLeader()");
+
+ }
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+ {
+ DummyProcess.sleep(_transDelay);
+
+ logger.info("DummyLeaderStandbyStateModel.onBecomeDroppedFromOffline()");
+
+ }
+
+ public void onBecomeStandbyFromOffline(Message message, NotificationContext context)
+ {
+ DummyProcess.sleep(_transDelay);
+
+ logger.info("DummyLeaderStandbyStateModel.onBecomeStandbyFromOffline()");
+
+ }
+
+ public void onBecomeOfflineFromStandby(Message message, NotificationContext context)
+ {
+ DummyProcess.sleep(_transDelay);
+
+ logger.info("DummyLeaderStandbyStateModel.onBecomeOfflineFromStandby()");
+
+ }
+ }
+
+ // TODO hack OptionBuilder is not thread safe
+ @SuppressWarnings("static-access")
+ synchronized private static Options constructCommandLineOptions()
+ {
+ Option helpOption = OptionBuilder.withLongOpt(help)
+ .withDescription("Prints command-line options info").create();
+
+ Option clusterOption = OptionBuilder.withLongOpt(cluster)
+ .withDescription("Provide cluster name").create();
+ clusterOption.setArgs(1);
+ clusterOption.setRequired(true);
+ clusterOption.setArgName("Cluster name (Required)");
+
+ Option hostOption = OptionBuilder.withLongOpt(hostAddress)
+ .withDescription("Provide host name").create();
+ hostOption.setArgs(1);
+ hostOption.setRequired(true);
+ hostOption.setArgName("Host name (Required)");
+
+ Option portOption = OptionBuilder.withLongOpt(hostPort)
+ .withDescription("Provide host port").create();
+ portOption.setArgs(1);
+ portOption.setRequired(true);
+ portOption.setArgName("Host port (Required)");
+
+ Option cmTypeOption = OptionBuilder.withLongOpt(helixManagerType)
+ .withDescription("Provide cluster manager type (e.g. 'zk', 'static-file', or 'dynamic-file'").create();
+ cmTypeOption.setArgs(1);
+ cmTypeOption.setRequired(true);
+ cmTypeOption.setArgName("Clsuter manager type (e.g. 'zk', 'static-file', or 'dynamic-file') (Required)");
+
+ // add an option group including either --zkSvr or --clusterViewFile
+ Option fileOption = OptionBuilder.withLongOpt(clusterViewFile)
+ .withDescription("Provide a cluster-view file for static-file based cluster manager").create();
+ fileOption.setArgs(1);
+ fileOption.setRequired(true);
+ fileOption.setArgName("Cluster-view file (Required for static-file based cluster manager)");
+
+ Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
+ .withDescription("Provide zookeeper address").create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("ZookeeperServerAddress(Required for zk-based cluster manager)");
+
+// Option rootNsOption = OptionBuilder.withLongOpt(rootNamespace)
+// .withDescription("Provide root namespace for dynamic-file based cluster manager").create();
+// rootNsOption.setArgs(1);
+// rootNsOption.setRequired(true);
+// rootNsOption.setArgName("Root namespace (Required for dynamic-file based cluster manager)");
+
+
+ Option transDelayOption = OptionBuilder.withLongOpt(transDelay)
+ .withDescription("Provide state trans delay").create();
+ transDelayOption.setArgs(1);
+ transDelayOption.setRequired(false);
+ transDelayOption.setArgName("Delay time in state transition, in MS");
+
+ OptionGroup optionGroup = new OptionGroup();
+ optionGroup.addOption(zkServerOption);
+ optionGroup.addOption(fileOption);
+// optionGroup.addOption(rootNsOption);
+
+ Options options = new Options();
+ options.addOption(helpOption);
+ options.addOption(clusterOption);
+ options.addOption(hostOption);
+ options.addOption(portOption);
+ options.addOption(transDelayOption);
+ options.addOption(cmTypeOption);
+
+ options.addOptionGroup(optionGroup);
+
+ return options;
+ }
+
+ public static void printUsage(Options cliOptions)
+ {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.printHelp("java " + DummyProcess.class.getName(), cliOptions);
+ }
+
+ public static CommandLine processCommandLineArgs(String[] cliArgs)
+ throws Exception
+ {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+ // CommandLine cmd = null;
+
+ try
+ {
+ return cliParser.parse(cliOptions, cliArgs);
+ } catch (ParseException pe)
+ {
+ System.err
+ .println("CommandLineClient: failed to parse command-line options: "
+ + pe.toString());
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+ return null;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ String cmType = "zk";
+ String zkConnectString = "localhost:2181";
+ String clusterName = "testCluster";
+ String instanceName = "localhost_8900";
+ String cvFileStr = null;
+// String rootNs = null;
+ int delay = 0;
+
+ if (args.length > 0)
+ {
+ CommandLine cmd = processCommandLineArgs(args);
+ zkConnectString = cmd.getOptionValue(zkServer);
+ clusterName = cmd.getOptionValue(cluster);
+
+ String host = cmd.getOptionValue(hostAddress);
+ String portString = cmd.getOptionValue(hostPort);
+ int port = Integer.parseInt(portString);
+ instanceName = host + "_" + port;
+ cmType = cmd.getOptionValue(helixManagerType);
+
+ if (cmd.hasOption(clusterViewFile))
+ {
+ cvFileStr = cmd.getOptionValue(clusterViewFile);
+ if (!new File(cvFileStr).exists())
+ {
+ throw new IllegalArgumentException("Cluster-view file:" + cvFileStr
+ + " does NOT exist");
+ }
+ }
+
+// if (cmd.hasOption(rootNamespace))
+// {
+// rootNs = cmd.getOptionValue(rootNamespace);
+// }
+
+ if (cmd.hasOption(transDelay))
+ {
+ try
+ {
+ delay = Integer.parseInt(cmd.getOptionValue(transDelay));
+ if (delay < 0)
+ {
+ throw new Exception("delay must be positive");
+ }
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ delay = 0;
+ }
+ }
+ }
+ // Espresso_driver.py will consume this
+ logger.info("Dummy process started, instanceName:" + instanceName);
+
+ DummyProcess process = new DummyProcess(zkConnectString,
+ clusterName,
+ instanceName,
+ cmType,
+ cvFileStr,
+ delay);
+ HelixManager manager = process.start();
+
+ try
+ {
+ Thread.currentThread().join();
+ }
+ catch (InterruptedException e)
+ {
+ // ClusterManagerFactory.disconnectManagers(instanceName);
+ logger.info("participant:" + instanceName + ", " +
+ Thread.currentThread().getName() + " interrupted");
+// if (manager != null)
+// {
+// manager.disconnect();
+// }
+ }
+ finally
+ {
+ if (manager != null)
+ {
+ manager.disconnect();
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModel.java
new file mode 100644
index 0000000..4a29f16
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModel.java
@@ -0,0 +1,247 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.healthcheck.StatHealthReportProvider;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.mock.consumer.ConsumerAdapter;
+import org.apache.helix.mock.consumer.RelayConfig;
+import org.apache.helix.mock.consumer.RelayConsumer;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.log4j.Logger;
+
+
+public class HealthCheckStateModel extends StateModel
+{
+
+ // private Map<Integer, RelayConsumer> relayConsumersMap;
+ private RelayConsumer consumer = null;
+ private RelayConfig relayConfig;
+ private StorageAdapter storage;
+ private StatHealthReportProvider _provider;
+ //private StatReporterThread _reporterThread;
+ private int _reportInterval;
+ private Map<String, Vector<String>> _reportValues;
+ private CountDownLatch _countdown;
+
+ private static Logger logger = Logger.getLogger(HealthCheckStateModel.class);
+
+ public HealthCheckStateModel(String stateUnitKey, StorageAdapter storageAdapter, StatHealthReportProvider provider,
+ int reportInterval, Map<String, Vector<String>> reportValues, CountDownLatch countdown)
+ {
+ // relayConsumersMap = new HashMap<Integer,RelayConsumer>();
+ storage = storageAdapter;
+ //_reporterThread = new StatReporterThread(provider, reportInterval, reportValues, countdown);
+ // this.consumerAdapter = consumerAdapter;
+ _provider = provider;
+ _reportInterval = reportInterval;
+ _reportValues = reportValues;
+ _countdown = countdown;
+ }
+
+ public RelayConfig getRelayConfig()
+ {
+ return relayConfig;
+ }
+
+ public void setRelayConfig(RelayConfig relayConfig)
+ {
+ this.relayConfig = relayConfig;
+ }
+
+ void checkDebug(Message task) throws Exception
+ {
+ // For debugging purposes
+ if ((Boolean) task.getDebug() == true)
+ {
+ throw new Exception("Exception for debug");
+ }
+ }
+
+ // @transition(to='to',from='from',blah blah..)
+ public void onBecomeSlaveFromOffline(Message task, NotificationContext context)
+ throws Exception
+ {
+
+ logger.info("Becoming slave from offline");
+
+ checkDebug(task);
+
+ String partition = (String) task.getPartitionName();
+ String[] pdata = partition.split("\\.");
+ String dbName = pdata[0];
+
+ // Initializations for the storage node to create right tables, indexes
+ // etc.
+ storage.init(partition);
+ storage.setPermissions(partition, "READONLY");
+
+ // start consuming from the relay
+ consumer = storage.getNewRelayConsumer(dbName, partition);
+ consumer.start();
+ // TODO: how do we know we are caught up?
+
+ logger.info("Became slave for partition " + partition);
+ }
+
+ // @transition(to='to',from='from',blah blah..)
+ public void onBecomeSlaveFromMaster(Message task, NotificationContext context)
+ throws Exception
+ {
+
+ logger.info("Becoming slave from master");
+
+ checkDebug(task);
+
+ String partition = (String) task.getPartitionName();
+ String[] pdata = partition.split("\\.");
+ String dbName = pdata[0];
+ storage.setPermissions(partition, "READONLY");
+ storage.waitForWrites(partition);
+
+ // start consuming from the relay
+ consumer = storage.getNewRelayConsumer(dbName, partition);
+ consumer.start();
+
+ logger.info("Becamse slave for partition " + partition);
+ }
+
+ // @transition(to='to',from='from',blah blah..)
+ public void onBecomeMasterFromSlave(Message task, NotificationContext context)
+ throws Exception
+ {
+ logger.info("Becoming master from slave");
+
+ checkDebug(task);
+
+ String partition = (String) task.getPartitionName();
+
+ // stop consumer and refetch from all so all changes are drained
+ consumer.flush(); // blocking call
+
+ // TODO: publish the hwm somewhere
+ long hwm = consumer.getHwm();
+ storage.setHwm(partition, hwm);
+ storage.removeConsumer(partition);
+ consumer = null;
+
+ // set generation in storage
+ Integer generationId = (Integer) task.getGeneration();
+ storage.setGeneration(partition, generationId);
+
+ storage.setPermissions(partition, "READWRITE");
+
+ String[] pdata = partition.split("\\.");
+ String dbName = pdata[0];
+
+ HelixManager manager = context.getManager();
+
+ //start the reporting thread
+ logger.debug("Starting stats reporting thread");
+ StatReporterThread reporterThread = new StatReporterThread(manager, _provider, dbName, partition,
+ _reportInterval, _reportValues, _countdown);
+ Thread t = new Thread(reporterThread);
+ t.run();
+ logger.info("Became master for partition " + partition);
+ }
+
+ // @transition(to='to',from='from',blah blah..)
+ public void onBecomeOfflineFromSlave(Message task, NotificationContext context)
+ throws Exception
+ {
+
+ logger.info("Becoming offline from slave");
+
+ checkDebug(task);
+
+ String partition = (String) task.getPartitionName();
+
+ consumer.stop();
+ storage.removeConsumer(partition);
+ consumer = null;
+
+ storage.setPermissions(partition, "OFFLINE");
+
+ logger.info("Became offline for partition " + partition);
+ }
+
+ public static String formStatName(String dbName, String partitionName, String metricName)
+ {
+ String statName;
+ statName = "db"+dbName+".partition"+partitionName+"."+metricName;
+ return statName;
+ }
+
+ public class StatReporterThread implements Runnable
+ {
+ private HelixManager _manager;
+ private int _reportInterval;
+ private Map<String, Vector<String>> _reportValues;
+ private CountDownLatch _countdown;
+ private StatHealthReportProvider _provider;
+ private String _dbName;
+ private String _partitionName;
+
+ public StatReporterThread(HelixManager manager, StatHealthReportProvider provider, String dbName,
+ String partitionName, int reportInterval,
+ Map<String,Vector<String>> reportValues, CountDownLatch countdown)
+ {
+ _manager = manager;
+ _reportInterval = reportInterval;
+ _reportValues = reportValues;
+ _countdown = countdown;
+ _provider = provider;
+ _dbName = dbName;
+ _partitionName = partitionName;
+ }
+
+ @Override
+ public void run()
+ {
+ boolean doneWithStats = false;
+ while (!doneWithStats) {
+ doneWithStats = true;
+ try {
+ Thread.sleep(_reportInterval);
+ } catch (InterruptedException e) {
+ logger.error("Unable to sleep, stats not getting staggered, "+e);
+ }
+ for (String metricName : _reportValues.keySet()) {
+ Vector<String> currValues = _reportValues.get(metricName);
+ if (currValues.size() > 0) {
+ doneWithStats = false;
+ String statName = formStatName(_dbName, _partitionName, metricName);
+ String currValue = currValues.remove(0);
+ Long currTimestamp = System.currentTimeMillis();
+ _provider.writeStat(statName, currValue, String.valueOf(currTimestamp));
+ }
+ }
+ _manager.getHealthReportCollector().transmitHealthReports();
+ }
+
+ _countdown.countDown();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModelFactory.java
new file mode 100644
index 0000000..15336ef
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModelFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.log4j.Logger;
+
+
+public class HealthCheckStateModelFactory extends StateModelFactory
+{
+ private static Logger logger = Logger
+ .getLogger(HealthCheckStateModelFactory.class);
+
+ private StorageAdapter storageAdapter;
+
+ // private ConsumerAdapter consumerAdapter;
+
+ public HealthCheckStateModelFactory(StorageAdapter storage)
+ {
+ storageAdapter = storage;
+ }
+
+ HealthCheckStateModel getStateModelForPartition(Integer partition)
+ {
+ return null;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String stateUnitKey)
+ {
+ logger.info("HealthCheckStateModelFactory.getStateModel()");
+ //TODO: fix these parameters
+ return new HealthCheckStateModel(stateUnitKey, storageAdapter, null, 0, null, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/MockEspressoHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/MockEspressoHealthReportProvider.java b/helix-core/src/test/java/org/apache/helix/mock/storage/MockEspressoHealthReportProvider.java
new file mode 100644
index 0000000..fb97e7f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/MockEspressoHealthReportProvider.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.healthcheck.HealthReportProvider;
+
+
+public class MockEspressoHealthReportProvider extends HealthReportProvider {
+
+ private final String _reportName = "RestQueryStats";
+ private HashMap<String, Map<String,String>> _statMap;
+ private final String DB_NAME = "DBName";
+
+ public MockEspressoHealthReportProvider()
+ {
+ super();
+ _statMap = new HashMap<String, Map<String,String>>();
+ }
+
+ public String buildMapKey(String dbName)
+ {
+ return _reportName+"@"+DB_NAME+"="+dbName;
+ }
+
+ public void setStat(String dbName, String statName, String statVal)
+ {
+ String currTime = String.valueOf(System.currentTimeMillis());
+ setStat(dbName, statName, statVal, currTime);
+ }
+
+ /*
+ * This version takes a fixed timestamp to ease with testing
+ */
+ public void setStat(String dbName, String statName, String statVal, String timestamp)
+ {
+ String key = buildMapKey(dbName);
+ Map<String, String> dbStatMap = _statMap.get(key);
+ if (dbStatMap == null) {
+ dbStatMap = new HashMap<String,String>();
+ _statMap.put(key, dbStatMap);
+ }
+ dbStatMap.put(statName, statVal);
+ dbStatMap.put(StatsHolder.TIMESTAMP_NAME, timestamp);
+ }
+
+ @Override
+ public Map<String, String> getRecentHealthReport() {
+ return null;
+ }
+
+ @Override
+ public Map<String, Map<String, String>> getRecentPartitionHealthReport() {
+ return _statMap;
+ }
+
+ @Override
+ public void resetStats() {
+ _statMap.clear();
+ }
+
+ public String getReportName()
+ {
+ return _reportName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/MockHealthReportParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/MockHealthReportParticipant.java b/helix-core/src/test/java/org/apache/helix/mock/storage/MockHealthReportParticipant.java
new file mode 100644
index 0000000..76ac216
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/MockHealthReportParticipant.java
@@ -0,0 +1,274 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.healthcheck.HealthReportProvider;
+import org.apache.log4j.Logger;
+
+
+public class MockHealthReportParticipant
+{
+ private static final Logger LOG =
+ Logger.getLogger(MockHealthReportParticipant.class);
+ public static final String zkServer = "zkSvr";
+ public static final String cluster = "cluster";
+ public static final String host = "host";
+ public static final String port = "port";
+ public static final String help = "help";
+
+ static class MockHealthReportProvider extends HealthReportProvider
+ {
+ private final String _reportName = "MockRestQueryStats";
+ private final Map<String, Map<String, String>> _mockHealthReport;
+
+ public MockHealthReportProvider()
+ {
+ _mockHealthReport = new HashMap<String, Map<String, String>>();
+
+ Map<String, String> reportMap = new HashMap<String, String>();
+ _mockHealthReport.put("MockRestQueryStats@DBName=BizProfile", reportMap);
+
+ reportMap.put("MeanMysqlLatency", "2.132700625");
+ reportMap.put("95PercentileLatencyLucene", "108.40825525");
+ reportMap.put("99PercentileLatencyMysql", "9.369827");
+ reportMap.put("99PercentileLatencyServer", "167.714208");
+ reportMap.put("95PercentileLatencyMysqlPool", "8.03621375");
+ reportMap.put("95PercentileLatencyServer", "164.68374265");
+ reportMap.put("MinLuceneLatency", "1.765908");
+ reportMap.put("MaxServerLatency", "167.714208");
+ reportMap.put("MeanLuceneLatency", "16.107599458333336");
+ reportMap.put("CollectorName", "RestQueryStats");
+ reportMap.put("MeanLucenePoolLatency", "8.120545333333332");
+ reportMap.put("99PercentileLatencyLucenePool", "65.930564");
+ reportMap.put("MinServerLatency", "0.425272");
+ reportMap.put("IndexStoreMismatchCount", "0");
+ reportMap.put("ErrorCount", "0");
+ reportMap.put("MeanMysqlPoolLatency", "1.0704102916666667");
+ reportMap.put("MinLucenePoolLatency", "0.008189");
+ reportMap.put("MinMysqlLatency", "0.709691");
+ reportMap.put("MaxMysqlPoolLatency", "8.606973");
+ reportMap.put("99PercentileLatencyMysqlPool", "8.606973");
+ reportMap.put("MinMysqlPoolLatency", "0.091883");
+ reportMap.put("MaxLucenePoolLatency", "65.930564");
+ reportMap.put("99PercentileLatencyLucene", "111.78799");
+ reportMap.put("MaxMysqlLatency", "9.369827");
+ reportMap.put("TimeStamp", "1332895048143");
+ reportMap.put("MeanConcurrencyLevel", "1.9");
+ reportMap.put("95PercentileLatencyMysql", "8.96594875");
+ reportMap.put("QueryStartCount", "0");
+ reportMap.put("95PercentileLatencyLucenePool", "63.518656500000006");
+ reportMap.put("MeanServerLatency", "39.5451532");
+ reportMap.put("MaxLuceneLatency", "111.78799");
+ reportMap.put("QuerySuccessCount", "0");
+ }
+
+ @Override
+ public Map<String, String> getRecentHealthReport()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void resetStats()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Map<String, Map<String, String>> getRecentPartitionHealthReport()
+ {
+ // tweak: randomly change the last digit
+ for (String key1 : _mockHealthReport.keySet())
+ {
+ Map<String, String> reportMap = _mockHealthReport.get(key1);
+ for (String key2 : reportMap.keySet())
+ {
+ String value = reportMap.get(key2);
+ String lastDigit = "" + new Random().nextInt(10);
+ value = value.substring(0, value.length() - 1) + lastDigit;
+ reportMap.put(key2, value);
+ }
+ }
+
+ return _mockHealthReport;
+ }
+
+ @Override
+ public String getReportName()
+ {
+ return _reportName;
+ }
+ }
+
+ static class MockHealthReportJob implements MockJobIntf
+ {
+
+ @Override
+ public void doPreConnectJob(HelixManager manager)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void doPostConnectJob(HelixManager manager)
+ {
+ // TODO Auto-generated method stub
+ manager.getHealthReportCollector()
+ .addHealthReportProvider(new MockHealthReportProvider());
+
+// // set property store path for perf test
+// final String setPath = "/TEST_PERF/set";
+// final String updatePath = "/TEST_PERF/update";
+// manager.getHelixPropertyStore().create(setPath, new ZNRecord(setPath), BaseDataAccessor.Option.PERSISTENT);
+// manager.getHelixPropertyStore().set(updatePath, new ZNRecord(updatePath), BaseDataAccessor.Option.PERSISTENT);
+ }
+
+ }
+
+ // hack OptionBuilder is not thread safe
+ @SuppressWarnings("static-access")
+ synchronized private static Options constructCommandLineOptions()
+ {
+ Option helpOption =
+ OptionBuilder.withLongOpt(help)
+ .withDescription("Prints command-line options info")
+ .create();
+
+ Option clusterOption =
+ OptionBuilder.withLongOpt(cluster)
+ .withDescription("Provide cluster name")
+ .create();
+ clusterOption.setArgs(1);
+ clusterOption.setRequired(true);
+ clusterOption.setArgName("Cluster name (Required)");
+
+ Option hostOption =
+ OptionBuilder.withLongOpt(host).withDescription("Provide host name").create();
+ hostOption.setArgs(1);
+ hostOption.setRequired(true);
+ hostOption.setArgName("Host name (Required)");
+
+ Option portOption =
+ OptionBuilder.withLongOpt(port).withDescription("Provide host port").create();
+ portOption.setArgs(1);
+ portOption.setRequired(true);
+ portOption.setArgName("Host port (Required)");
+
+ Option zkServerOption =
+ OptionBuilder.withLongOpt(zkServer)
+ .withDescription("Provide zookeeper address")
+ .create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("Zookeeper server address(Required)");
+
+ Options options = new Options();
+ options.addOption(helpOption);
+ options.addOption(clusterOption);
+ options.addOption(hostOption);
+ options.addOption(portOption);
+ options.addOption(zkServerOption);
+
+ return options;
+ }
+
+ public static void printUsage(Options cliOptions)
+ {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.printHelp("java " + MockHealthReportParticipant.class.getName(),
+ cliOptions);
+ }
+
+ public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception
+ {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+
+ try
+ {
+
+ return cliParser.parse(cliOptions, cliArgs);
+ }
+ catch (ParseException pe)
+ {
+ System.err.println("CommandLineClient: failed to parse command-line options: "
+ + pe.toString());
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+ return null;
+ }
+
+ // NOT working for kill -9, working for kill -2/-15
+ static class MockHealthReportParticipantShutdownHook extends Thread
+ {
+ final MockParticipant _participant;
+
+ MockHealthReportParticipantShutdownHook(MockParticipant participant)
+ {
+ _participant = participant;
+ }
+
+ @Override
+ public void run()
+ {
+ LOG.info("MockHealthReportParticipantShutdownHook invoked");
+ _participant.syncStop();
+ }
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ CommandLine cmd = processCommandLineArgs(args);
+ String zkConnectStr = cmd.getOptionValue(zkServer);
+ String clusterName = cmd.getOptionValue(cluster);
+ String hostStr = cmd.getOptionValue(host);
+ String portStr = cmd.getOptionValue(port);
+
+ String instanceName = hostStr + "_" + portStr;
+
+ MockParticipant participant =
+ new MockParticipant(clusterName,
+ instanceName,
+ zkConnectStr,
+ null, // new StoreAccessDiffNodeTransition(), // new StoreAccessOneNodeTransition(),
+ new MockHealthReportJob());
+ Runtime.getRuntime()
+ .addShutdownHook(new MockHealthReportParticipantShutdownHook(participant));
+
+ // Espresso_driver.py will consume this
+ System.out.println("MockHealthReportParticipant process started, instanceName: "
+ + instanceName);
+
+ participant.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/MockJobIntf.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/MockJobIntf.java b/helix-core/src/test/java/org/apache/helix/mock/storage/MockJobIntf.java
new file mode 100644
index 0000000..b400ab9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/MockJobIntf.java
@@ -0,0 +1,24 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import org.apache.helix.HelixManager;
+
+public interface MockJobIntf
+{
+ public void doPreConnectJob(HelixManager manager);
+ public void doPostConnectJob(HelixManager manager);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/MockParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/MockParticipant.java b/helix-core/src/test/java/org/apache/helix/mock/storage/MockParticipant.java
new file mode 100644
index 0000000..bfffcda
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/MockParticipant.java
@@ -0,0 +1,615 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.mock.storage.DummyProcess.DummyLeaderStandbyStateModelFactory;
+import org.apache.helix.mock.storage.DummyProcess.DummyOnlineOfflineStateModelFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.log4j.Logger;
+
+
+public class MockParticipant extends Thread
+{
+ private static Logger LOG =
+ Logger.getLogger(MockParticipant.class);
+ private final String _clusterName;
+ private final String _instanceName;
+ // private final String _zkAddr;
+
+ private final CountDownLatch _startCountDown = new CountDownLatch(1);
+ private final CountDownLatch _stopCountDown = new CountDownLatch(1);
+ private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
+
+ private final HelixManager _manager;
+ private final StateModelFactory _msModelFactory;
+ private final MockJobIntf _job;
+
+ // mock master-slave state model
+ @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" })
+ public static class MockMSStateModel extends StateModel
+ {
+ protected MockTransition _transition;
+
+ public MockMSStateModel(MockTransition transition)
+ {
+ _transition = transition;
+ }
+
+ public void setTransition(MockTransition transition)
+ {
+ _transition = transition;
+ }
+
+ @Transition(to = "SLAVE", from = "OFFLINE")
+ public void onBecomeSlaveFromOffline(Message message, NotificationContext context) throws InterruptedException
+ {
+ LOG.info("Become SLAVE from OFFLINE");
+ if (_transition != null)
+ {
+ _transition.doTransition(message, context);
+
+ }
+ }
+
+ @Transition(to = "MASTER", from = "SLAVE")
+ public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException
+ {
+ LOG.info("Become MASTER from SLAVE");
+ if (_transition != null)
+ {
+ _transition.doTransition(message, context);
+ }
+ }
+
+ @Transition(to = "SLAVE", from = "MASTER")
+ public void onBecomeSlaveFromMaster(Message message, NotificationContext context) throws InterruptedException
+ {
+ LOG.info("Become SLAVE from MASTER");
+ if (_transition != null)
+ {
+ _transition.doTransition(message, context);
+ }
+ }
+
+ @Transition(to = "OFFLINE", from = "SLAVE")
+ public void onBecomeOfflineFromSlave(Message message, NotificationContext context) throws InterruptedException
+ {
+ LOG.info("Become OFFLINE from SLAVE");
+ if (_transition != null)
+ {
+ _transition.doTransition(message, context);
+ }
+ }
+
+ @Transition(to = "DROPPED", from = "OFFLINE")
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context) throws InterruptedException
+ {
+ LOG.info("Become DROPPED from OFFLINE");
+ if (_transition != null)
+ {
+ _transition.doTransition(message, context);
+ }
+ }
+
+ @Transition(to = "OFFLINE", from = "ERROR")
+ public void onBecomeOfflineFromError(Message message, NotificationContext context) throws InterruptedException
+ {
+ LOG.info("Become OFFLINE from ERROR");
+ // System.err.println("Become OFFLINE from ERROR");
+ if (_transition != null)
+ {
+ _transition.doTransition(message, context);
+ }
+ }
+
+ @Override
+ public void reset()
+ {
+ LOG.info("Default MockMSStateModel.reset() invoked");
+ if (_transition != null)
+ {
+ _transition.doReset();
+ }
+ }
+ }
+
+ // mock master slave state model factory
+ public static class MockMSModelFactory extends StateModelFactory<MockMSStateModel>
+ {
+ private final MockTransition _transition;
+
+ public MockMSModelFactory()
+ {
+ this(null);
+ }
+
+ public MockMSModelFactory(MockTransition transition)
+ {
+ _transition = transition;
+ }
+
+ public void setTrasition(MockTransition transition)
+ {
+ Map<String, MockMSStateModel> stateModelMap = getStateModelMap();
+ for (MockMSStateModel stateModel : stateModelMap.values())
+ {
+ stateModel.setTransition(transition);
+ }
+ }
+
+ @Override
+ public MockMSStateModel createNewStateModel(String partitionKey)
+ {
+ MockMSStateModel model = new MockMSStateModel(_transition);
+
+ return model;
+ }
+ }
+
+ // mock STORAGE_DEFAULT_SM_SCHEMATA state model
+ @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "DROPPED", "ERROR" })
+ public class MockSchemataStateModel extends StateModel
+ {
+ @Transition(to = "MASTER", from = "OFFLINE")
+ public void onBecomeMasterFromOffline(Message message, NotificationContext context)
+ {
+ LOG.info("Become MASTER from OFFLINE");
+ }
+
+ @Transition(to = "OFFLINE", from = "MASTER")
+ public void onBecomeOfflineFromMaster(Message message, NotificationContext context)
+ {
+ LOG.info("Become OFFLINE from MASTER");
+ }
+
+ @Transition(to = "DROPPED", from = "OFFLINE")
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+ {
+ LOG.info("Become DROPPED from OFFLINE");
+ }
+
+ @Transition(to = "OFFLINE", from = "ERROR")
+ public void onBecomeOfflineFromError(Message message, NotificationContext context)
+ {
+ LOG.info("Become OFFLINE from ERROR");
+ }
+ }
+
+ // mock Bootstrap state model
+ @StateModelInfo(initialState = "OFFLINE", states = { "ONLINE", "BOOTSTRAP", "OFFLINE",
+ "IDLE" })
+ public static class MockBootstrapStateModel extends StateModel
+ {
+ // Overwrite the default value of intial state
+ MockBootstrapStateModel()
+ {
+ _currentState = "IDLE";
+ }
+
+ @Transition(to = "OFFLINE", from = "IDLE")
+ public void onBecomeOfflineFromIdle(Message message, NotificationContext context)
+ {
+ LOG.info("Become OFFLINE from IDLE");
+ }
+
+ @Transition(to = "BOOTSTRAP", from = "OFFLINE")
+ public void onBecomeBootstrapFromOffline(Message message, NotificationContext context)
+ {
+ LOG.info("Become BOOTSTRAP from OFFLINE");
+ }
+
+ @Transition(to = "ONLINE", from = "BOOSTRAP")
+ public void onBecomeOnlineFromBootstrap(Message message, NotificationContext context)
+ {
+ LOG.info("Become ONLINE from BOOTSTRAP");
+ }
+
+ @Transition(to = "OFFLINE", from = "ONLINE")
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext context)
+ {
+ LOG.info("Become OFFLINE from ONLINE");
+ }
+ }
+
+ // mock STORAGE_DEFAULT_SM_SCHEMATA state model factory
+ public class MockSchemataModelFactory extends StateModelFactory<MockSchemataStateModel>
+ {
+ @Override
+ public MockSchemataStateModel createNewStateModel(String partitionKey)
+ {
+ MockSchemataStateModel model = new MockSchemataStateModel();
+ return model;
+ }
+ }
+
+ // mock Bootstrap state model factory
+ public static class MockBootstrapModelFactory extends
+ StateModelFactory<MockBootstrapStateModel>
+ {
+ @Override
+ public MockBootstrapStateModel createNewStateModel(String partitionKey)
+ {
+ MockBootstrapStateModel model = new MockBootstrapStateModel();
+ return model;
+ }
+ }
+
+ // simulate error transition
+ public static class ErrTransition extends MockTransition
+ {
+ private final Map<String, Set<String>> _errPartitions;
+
+ public ErrTransition(Map<String, Set<String>> errPartitions)
+ {
+ if (errPartitions != null)
+ {
+ // change key to upper case
+ _errPartitions = new HashMap<String, Set<String>>();
+ for (String key : errPartitions.keySet())
+ {
+ String upperKey = key.toUpperCase();
+ _errPartitions.put(upperKey, errPartitions.get(key));
+ }
+ }
+ else
+ {
+ _errPartitions = Collections.emptyMap();
+ }
+ }
+
+ @Override
+ public void doTransition(Message message, NotificationContext context)
+ {
+ String fromState = message.getFromState();
+ String toState = message.getToState();
+ String partition = message.getPartitionName();
+
+ String key = (fromState + "-" + toState).toUpperCase();
+ if (_errPartitions.containsKey(key) && _errPartitions.get(key).contains(partition))
+ {
+ String errMsg =
+ "IGNORABLE: test throw exception for " + partition + " transit from "
+ + fromState + " to " + toState;
+ throw new RuntimeException(errMsg);
+ }
+ }
+ }
+
+ // simulate long transition
+ public static class SleepTransition extends MockTransition
+ {
+ private final long _delay;
+
+ public SleepTransition(long delay)
+ {
+ _delay = delay > 0 ? delay : 0;
+ }
+
+ @Override
+ public void doTransition(Message message, NotificationContext context) throws InterruptedException
+ {
+ Thread.sleep(_delay);
+
+ }
+ }
+
+ // simulate access property store and update one znode
+ public static class StoreAccessOneNodeTransition extends MockTransition
+ {
+ @Override
+ public void doTransition(Message message, NotificationContext context) throws InterruptedException
+ {
+ HelixManager manager = context.getManager();
+ ZkHelixPropertyStore<ZNRecord> store = manager.getHelixPropertyStore();
+ final String setPath = "/TEST_PERF/set";
+ final String updatePath = "/TEST_PERF/update";
+ final String key = message.getPartitionName();
+ try
+ {
+ // get/set once
+ ZNRecord record = null;
+ try
+ {
+ record = store.get(setPath, null, 0);
+ }
+ catch (ZkNoNodeException e)
+ {
+ record = new ZNRecord(setPath);
+ }
+ record.setSimpleField("setTimestamp", "" + System.currentTimeMillis());
+ store.set(setPath, record, AccessOption.PERSISTENT);
+
+ // update once
+ store.update(updatePath, new DataUpdater<ZNRecord>()
+ {
+
+ @Override
+ public ZNRecord update(ZNRecord currentData)
+ {
+ if (currentData == null)
+ {
+ currentData = new ZNRecord(updatePath);
+ }
+ currentData.setSimpleField(key, "" + System.currentTimeMillis());
+
+ return currentData;
+ }
+
+ }, AccessOption.PERSISTENT);
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ }
+
+ // simulate access property store and update different znodes
+ public static class StoreAccessDiffNodeTransition extends MockTransition
+ {
+ @Override
+ public void doTransition(Message message, NotificationContext context) throws InterruptedException
+ {
+ HelixManager manager = context.getManager();
+ ZkHelixPropertyStore<ZNRecord> store = manager.getHelixPropertyStore();
+ final String setPath = "/TEST_PERF/set/" + message.getPartitionName();
+ final String updatePath = "/TEST_PERF/update/" + message.getPartitionName();
+ // final String key = message.getPartitionName();
+ try
+ {
+ // get/set once
+ ZNRecord record = null;
+ try
+ {
+ record = store.get(setPath, null, 0);
+ }
+ catch (ZkNoNodeException e)
+ {
+ record = new ZNRecord(setPath);
+ }
+ record.setSimpleField("setTimestamp", "" + System.currentTimeMillis());
+ store.set(setPath, record, AccessOption.PERSISTENT);
+
+ // update once
+ store.update(updatePath, new DataUpdater<ZNRecord>()
+ {
+
+ @Override
+ public ZNRecord update(ZNRecord currentData)
+ {
+ if (currentData == null)
+ {
+ currentData = new ZNRecord(updatePath);
+ }
+ currentData.setSimpleField("updateTimestamp", "" + System.currentTimeMillis());
+
+ return currentData;
+ }
+
+ }, AccessOption.PERSISTENT);
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ }
+
+ public MockParticipant(String clusterName, String instanceName, String zkAddr) throws Exception
+ {
+ this(clusterName, instanceName, zkAddr, null, null);
+ }
+
+ public MockParticipant(String clusterName,
+ String instanceName,
+ String zkAddr,
+ MockTransition transition) throws Exception
+ {
+ this(clusterName, instanceName, zkAddr, transition, null);
+ }
+
+ public MockParticipant(String clusterName,
+ String instanceName,
+ String zkAddr,
+ MockTransition transition,
+ MockJobIntf job) throws Exception
+ {
+ _clusterName = clusterName;
+ _instanceName = instanceName;
+ _msModelFactory = new MockMSModelFactory(transition);
+
+ _manager =
+ HelixManagerFactory.getZKHelixManager(_clusterName,
+ _instanceName,
+ InstanceType.PARTICIPANT,
+ zkAddr);
+ _job = job;
+ }
+
+ public MockParticipant(StateModelFactory factory,
+ String clusterName,
+ String instanceName,
+ String zkAddr,
+ MockJobIntf job) throws Exception
+ {
+ _clusterName = clusterName;
+ _instanceName = instanceName;
+ _msModelFactory = factory;
+
+ _manager =
+ HelixManagerFactory.getZKHelixManager(_clusterName,
+ _instanceName,
+ InstanceType.PARTICIPANT,
+ zkAddr);
+ _job = job;
+ }
+
+ public StateModelFactory getStateModelFactory()
+ {
+ return _msModelFactory;
+ }
+
+ public MockParticipant(HelixManager manager, MockTransition transition)
+ {
+ _clusterName = manager.getClusterName();
+ _instanceName = manager.getInstanceName();
+ _manager = manager;
+
+ _msModelFactory = new MockMSModelFactory(transition);
+ _job = null;
+ }
+
+ public void setTransition(MockTransition transition)
+ {
+ if (_msModelFactory instanceof MockMSModelFactory)
+ {
+ ((MockMSModelFactory) _msModelFactory).setTrasition(transition);
+ }
+ }
+
+ public HelixManager getManager()
+ {
+ return _manager;
+ }
+
+ public String getInstanceName()
+ {
+ return _instanceName;
+ }
+
+ public String getClusterName()
+ {
+ return _clusterName;
+ }
+
+ public void syncStop()
+ {
+ _stopCountDown.countDown();
+ try
+ {
+ _waitStopFinishCountDown.await();
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ // synchronized (_manager)
+ // {
+ // _manager.disconnect();
+ // }
+ }
+
+ public void syncStart()
+ {
+ super.start();
+ try
+ {
+ _startCountDown.await();
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ StateMachineEngine stateMach = _manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
+
+ DummyLeaderStandbyStateModelFactory lsModelFactory =
+ new DummyLeaderStandbyStateModelFactory(10);
+ DummyOnlineOfflineStateModelFactory ofModelFactory =
+ new DummyOnlineOfflineStateModelFactory(10);
+ stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
+ stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
+
+ MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
+ stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
+ // MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
+ // stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
+
+ if (_job != null)
+ {
+ _job.doPreConnectJob(_manager);
+ }
+
+ _manager.connect();
+ _startCountDown.countDown();
+
+ if (_job != null)
+ {
+ _job.doPostConnectJob(_manager);
+ }
+
+ _stopCountDown.await();
+ }
+ catch (InterruptedException e)
+ {
+ String msg =
+ "participant: " + _instanceName + ", " + Thread.currentThread().getName()
+ + " is interrupted";
+ LOG.info(msg);
+ System.err.println(msg);
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ finally
+ {
+ _startCountDown.countDown();
+
+ synchronized (_manager)
+ {
+ _manager.disconnect();
+ }
+ _waitStopFinishCountDown.countDown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/MockStorageProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/MockStorageProcess.java b/helix-core/src/test/java/org/apache/helix/mock/storage/MockStorageProcess.java
new file mode 100644
index 0000000..803192b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/MockStorageProcess.java
@@ -0,0 +1,171 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import org.apache.helix.manager.zk.ZKDataAccessor;
+import org.apache.helix.mock.consumer.ConsumerAdapter;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.log4j.*;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+
+public class MockStorageProcess
+{
+ static Logger logger = Logger.getLogger(MockStorageProcess.class);
+
+ public static final String zkServer = "zkSvr";
+ public static final String cluster = "cluster";
+ public static final String hostAddress = "host";
+ public static final String hostPort = "port";
+ public static final String relayCluster = "relayCluster";
+ public static final String help = "help";
+
+ private StorageAdapter storageAdapter;
+ private ConsumerAdapter consumerAdapter;
+
+ boolean put(Object key, Object val)
+ {
+ Integer partitionId = 1;
+ storageAdapter.isMasterForPartition(partitionId);
+ return true;
+ }
+
+ Object get(Object key)
+ {
+ Integer partitionId = 1;
+ if (storageAdapter.isMasterForPartition(partitionId)
+ || storageAdapter.isReplicaForPartition(partitionId))
+ {
+ return new String("val for " + key);
+ }
+ return null;
+ }
+
+ void start(String instanceName, String zkServerAddress, String clusterName,
+ String relayClusterName) throws Exception
+ {
+ storageAdapter = new StorageAdapter(instanceName, zkServerAddress,
+ clusterName, relayClusterName);
+ storageAdapter.start();
+ }
+
+ @SuppressWarnings("static-access")
+ private static Options constructCommandLineOptions()
+ {
+ Option helpOption = OptionBuilder.withLongOpt(help)
+ .withDescription("Prints command-line options info").create();
+
+ Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
+ .withDescription("Provide zookeeper address").create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+ Option clusterOption = OptionBuilder.withLongOpt(cluster)
+ .withDescription("Provide cluster name").create();
+ clusterOption.setArgs(1);
+ clusterOption.setRequired(true);
+ clusterOption.setArgName("Cluster name (Required)");
+
+ Option hostOption = OptionBuilder.withLongOpt(hostAddress)
+ .withDescription("Provide host name").create();
+ hostOption.setArgs(1);
+ hostOption.setRequired(true);
+ hostOption.setArgName("Host name (Required)");
+
+ Option portOption = OptionBuilder.withLongOpt(hostPort)
+ .withDescription("Provide host port").create();
+ portOption.setArgs(1);
+ portOption.setRequired(true);
+ portOption.setArgName("Host port (Required)");
+
+ Option relayClusterOption = OptionBuilder.withLongOpt(relayCluster)
+ .withDescription("Provide relay cluster name").create();
+ relayClusterOption.setArgs(1);
+ relayClusterOption.setRequired(true);
+ relayClusterOption.setArgName("Relay cluster name (Required)");
+
+ Options options = new Options();
+ options.addOption(helpOption);
+ options.addOption(zkServerOption);
+ options.addOption(clusterOption);
+ options.addOption(hostOption);
+ options.addOption(portOption);
+ options.addOption(relayClusterOption);
+ return options;
+ }
+
+ public static void printUsage(Options cliOptions)
+ {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
+ }
+
+ public static CommandLine processCommandLineArgs(String[] cliArgs)
+ throws Exception
+ {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+ CommandLine cmd = null;
+
+ try
+ {
+ return cliParser.parse(cliOptions, cliArgs);
+ } catch (ParseException pe)
+ {
+ System.err
+ .println("CommandLineClient: failed to parse command-line options: "
+ + pe.toString());
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+ return null;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ String clusterName = "storage-cluster";
+ String relayClusterName = "relay-cluster";
+ String zkServerAddress = "localhost:2181";
+ String host = "localhost";
+ int port = 8900;
+ if (args.length > 0)
+ {
+ CommandLine cmd = processCommandLineArgs(args);
+ zkServerAddress = cmd.getOptionValue(zkServer);
+ clusterName = cmd.getOptionValue(cluster);
+ relayClusterName = cmd.getOptionValue(relayCluster);
+ host = cmd.getOptionValue(hostAddress);
+ String portString = cmd.getOptionValue(hostPort);
+ port = Integer.parseInt(portString);
+ }
+ // Espresso_driver.py will consume this
+ System.out.println("Mock storage started");
+ MockStorageProcess process = new MockStorageProcess();
+ process.start(host + "_" + port, zkServerAddress, clusterName,
+ relayClusterName);
+
+ Thread.sleep(10000000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/MockTransition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/MockTransition.java b/helix-core/src/test/java/org/apache/helix/mock/storage/MockTransition.java
new file mode 100644
index 0000000..ea71458
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/MockTransition.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+
+public class MockTransition
+{
+ private static Logger LOG = Logger.getLogger(MockTransition.class);
+
+ // called by state model transition functions
+ public void doTransition(Message message, NotificationContext context) throws InterruptedException
+ {
+ LOG.info("default doTransition() invoked");
+ }
+
+ // called by state model reset function
+ public void doReset()
+ {
+ LOG.info("default doReset() invoked");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/StorageAdapter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/StorageAdapter.java b/helix-core/src/test/java/org/apache/helix/mock/storage/StorageAdapter.java
new file mode 100644
index 0000000..4c190ed
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/StorageAdapter.java
@@ -0,0 +1,206 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.MessageListener;
+import org.apache.helix.mock.consumer.ConsumerAdapter;
+import org.apache.helix.mock.consumer.RelayConfig;
+import org.apache.helix.mock.consumer.RelayConsumer;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+
+class StorageAdapter
+{
+ HelixManager relayHelixManager;
+ HelixManager storageHelixManager;
+
+ HelixDataAccessor relayClusterClient;
+ HelixDataAccessor storageClusterClient;
+
+ private ExternalViewChangeListener relayViewHolder;
+ private MessageListener messageListener;
+
+ // Map<Object, RelayConsumer> relayConsumersMap;
+ private final ConsumerAdapter consumerAdapter;
+ private final StorageStateModelFactory stateModelFactory;
+
+ class partitionData
+ {
+ long initTime;
+ String permissions;
+ int generationId;
+ long hwm;
+ }
+
+ Map<String, partitionData> hostedPartitions;
+ private final String instanceName;
+
+ private static Logger logger = Logger.getLogger(StorageAdapter.class);
+
+ public StorageAdapter(String instanceName, String zkConnectString,
+ String clusterName, String relayClusterName) throws Exception
+ {
+
+ this.instanceName = instanceName;
+
+ hostedPartitions = new ConcurrentHashMap<String, partitionData>();
+
+ storageHelixManager = HelixManagerFactory
+ .getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT,
+ zkConnectString);
+ stateModelFactory = new StorageStateModelFactory(this);
+// StateMachineEngine genericStateMachineHandler = new StateMachineEngine();
+ StateMachineEngine stateMach = storageHelixManager.getStateMachineEngine();
+ stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+
+ storageHelixManager.getMessagingService()
+ .registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), stateMach);
+ storageHelixManager.connect();
+ storageClusterClient = storageHelixManager.getHelixDataAccessor();
+
+ consumerAdapter = new ConsumerAdapter(instanceName, zkConnectString,
+ relayClusterName);
+ }
+
+ // for every write call
+ public boolean isMasterForPartition(Integer partitionId)
+ {
+ StorageStateModel stateModelForParition = stateModelFactory
+ .getStateModelForPartition(partitionId);
+ return "MASTER".equals(stateModelForParition.getCurrentState());
+ }
+
+ // for every read call depending on read scale config
+ public boolean isReplicaForPartition(Integer partitionId)
+ {
+ StorageStateModel stateModelForParition = stateModelFactory
+ .getStateModelForPartition(partitionId);
+ return "REPLICA".equals(stateModelForParition.getCurrentState());
+ }
+
+ /**
+ * During replication set up which will happen when there is state transition
+ * //TODO may not be nee
+ */
+ void getMasteredPartitions()
+ {
+
+ }
+
+ /*
+ * During replication set up which will happen when there is state transition
+ *
+ * @return
+ */
+ Map<Integer, RelayConfig> getReplicatedPartitions()
+ {
+ return null;
+ }
+
+ /**
+ * Will be used in relay consumers, return can be one RelayConfig or List
+ * depending on implementation
+ */
+ List<RelayConfig> getRelaysForPartition(Integer partitionId)
+ {
+ return null;
+ }
+
+ void updateHighWaterMarkForPartition(String waterMark, Integer partitionId)
+ {
+
+ }
+
+ public void endProcess()
+ {
+
+ }
+
+ public void start()
+ {
+ logger.info("Started storage node " + instanceName);
+ }
+
+ public void setGeneration(String partition, Integer generationId)
+ {
+ partitionData pdata = hostedPartitions.get(partition);
+ pdata.generationId = generationId;
+ hostedPartitions.put(partition, pdata);
+ }
+
+ public void setHwm(String partition, long hwm)
+ {
+ partitionData pdata = hostedPartitions.get(partition);
+ pdata.hwm = hwm;
+ hostedPartitions.put(partition, pdata);
+ }
+
+ // TODO: make sure multiple invocations are possible
+ public void init(String partition)
+ {
+ logger.info("Storage initializing partition " + partition);
+ if (hostedPartitions.containsKey(partition))
+ {
+ logger.info("Partition exists, not reinitializing.");
+ } else
+ {
+ partitionData pdata = new partitionData();
+ pdata.initTime = System.currentTimeMillis();
+ pdata.permissions = "OFFLINE";
+ hostedPartitions.put(partition, pdata);
+ }
+ logger.info("Storage initialized for partition " + partition);
+ }
+
+ public void setPermissions(String partition, String permissions)
+ {
+ partitionData pdata = hostedPartitions.get(partition);
+ pdata.permissions = permissions;
+ hostedPartitions.put(partition, pdata);
+ }
+
+ public void waitForWrites(String partition)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public RelayConsumer getNewRelayConsumer(String dbName, String partition)
+ throws Exception
+ {
+ logger.info("Got new relayconsumer for " + partition);
+ return consumerAdapter.getNewRelayConsumer(dbName, partition);
+ }
+
+ public void removeConsumer(String partition) throws Exception
+ {
+ logger.info("Removing consumer for partition " + partition);
+ consumerAdapter.removeConsumer(partition);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModel.java
new file mode 100644
index 0000000..6b4acd3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModel.java
@@ -0,0 +1,157 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.mock.consumer.RelayConfig;
+import org.apache.helix.mock.consumer.RelayConsumer;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.log4j.Logger;
+
+
+public class StorageStateModel extends StateModel
+{
+
+ // private Map<Integer, RelayConsumer> relayConsumersMap;
+ private RelayConsumer consumer = null;
+ private RelayConfig relayConfig;
+ private final StorageAdapter storage;
+
+ private static Logger logger = Logger.getLogger(StorageStateModel.class);
+
+ public StorageStateModel(String stateUnitKey, StorageAdapter storageAdapter)
+ {
+ // relayConsumersMap = new HashMap<Integer,RelayConsumer>();
+ storage = storageAdapter;
+ // this.consumerAdapter = consumerAdapter;
+ }
+
+ public RelayConfig getRelayConfig()
+ {
+ return relayConfig;
+ }
+
+ public void setRelayConfig(RelayConfig relayConfig)
+ {
+ this.relayConfig = relayConfig;
+ }
+
+ void checkDebug(Message task) throws Exception
+ {
+ // For debugging purposes
+ if (task.getDebug() == true)
+ {
+ throw new Exception("Exception for debug");
+ }
+ }
+
+ // @transition(to='to',from='from',blah blah..)
+ public void onBecomeSlaveFromOffline(Message task, NotificationContext context)
+ throws Exception
+ {
+
+ logger.info("Becoming slave from offline");
+
+ checkDebug(task);
+
+ String partition = task.getPartitionName();
+ String[] pdata = partition.split("\\.");
+ String dbName = pdata[0];
+
+ // Initializations for the storage node to create right tables, indexes
+ // etc.
+ storage.init(partition);
+ storage.setPermissions(partition, "READONLY");
+
+ // start consuming from the relay
+ consumer = storage.getNewRelayConsumer(dbName, partition);
+ consumer.start();
+ // TODO: how do we know we are caught up?
+
+ logger.info("Became slave for partition " + partition);
+ }
+
+ // @transition(to='to',from='from',blah blah..)
+ public void onBecomeSlaveFromMaster(Message task, NotificationContext context)
+ throws Exception
+ {
+
+ logger.info("Becoming slave from master");
+
+ checkDebug(task);
+
+ String partition = task.getPartitionName();
+ String[] pdata = partition.split("\\.");
+ String dbName = pdata[0];
+ storage.setPermissions(partition, "READONLY");
+ storage.waitForWrites(partition);
+
+ // start consuming from the relay
+ consumer = storage.getNewRelayConsumer(dbName, partition);
+ consumer.start();
+
+ logger.info("Becamse slave for partition " + partition);
+ }
+
+ // @transition(to='to',from='from',blah blah..)
+ public void onBecomeMasterFromSlave(Message task, NotificationContext context)
+ throws Exception
+ {
+ logger.info("Becoming master from slave");
+
+ checkDebug(task);
+
+ String partition = task.getPartitionName();
+
+ // stop consumer and refetch from all so all changes are drained
+ consumer.flush(); // blocking call
+
+ // TODO: publish the hwm somewhere
+ long hwm = consumer.getHwm();
+ storage.setHwm(partition, hwm);
+ storage.removeConsumer(partition);
+ consumer = null;
+
+ // set generation in storage
+ Integer generationId = task.getGeneration();
+ storage.setGeneration(partition, generationId);
+
+ storage.setPermissions(partition, "READWRITE");
+
+ logger.info("Became master for partition " + partition);
+ }
+
+ // @transition(to='to',from='from',blah blah..)
+ public void onBecomeOfflineFromSlave(Message task, NotificationContext context)
+ throws Exception
+ {
+
+ logger.info("Becoming offline from slave");
+
+ checkDebug(task);
+
+ String partition = task.getPartitionName();
+
+ consumer.stop();
+ storage.removeConsumer(partition);
+ consumer = null;
+
+ storage.setPermissions(partition, "OFFLINE");
+
+ logger.info("Became offline for partition " + partition);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModelFactory.java
new file mode 100644
index 0000000..132419c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModelFactory.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.log4j.Logger;
+
+
+public class StorageStateModelFactory extends StateModelFactory
+{
+ private static Logger logger = Logger
+ .getLogger(StorageStateModelFactory.class);
+
+ private StorageAdapter storageAdapter;
+
+ // private ConsumerAdapter consumerAdapter;
+
+ public StorageStateModelFactory(StorageAdapter storage)
+ {
+ storageAdapter = storage;
+ }
+
+ StorageStateModel getStateModelForPartition(Integer partition)
+ {
+ return null;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String stateUnitKey)
+ {
+ logger.info("StorageStateModelFactory.getStateModel()");
+ return new StorageStateModel(stateUnitKey, storageAdapter);
+ }
+
+}