You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC

[39/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/DummyProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/DummyProcess.java b/helix-core/src/test/java/org/apache/helix/mock/storage/DummyProcess.java
new file mode 100644
index 0000000..957ff2a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/DummyProcess.java
@@ -0,0 +1,546 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import java.io.File;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.log4j.Logger;
+
+
+public class DummyProcess
+{
+  private static final Logger logger = Logger.getLogger(DummyProcess.class);
+  public static final String zkServer = "zkSvr";
+  public static final String cluster = "cluster";
+  public static final String hostAddress = "host";
+  public static final String hostPort = "port";
+  public static final String relayCluster = "relayCluster";
+  public static final String help = "help";
+  public static final String clusterViewFile = "clusterViewFile";
+  public static final String transDelay = "transDelay";
+  public static final String helixManagerType = "helixManagerType";
+//  public static final String rootNamespace = "rootNamespace";
+
+  private final String _zkConnectString;
+  private final String _clusterName;
+  private final String _instanceName;
+  private DummyStateModelFactory stateModelFactory;
+//  private StateMachineEngine genericStateMachineHandler;
+
+  private final FilePropertyStore<ZNRecord> _fileStore;
+
+  private final String _clusterViewFile;
+  private int _transDelayInMs = 0;
+  private final String _clusterMangerType;
+
+  public DummyProcess(String zkConnectString,
+                      String clusterName,
+                      String instanceName,
+                      String clusterMangerType,
+                      String clusterViewFile,
+                      int delay)
+  {
+    this(zkConnectString, clusterName, instanceName, "zk", clusterViewFile, delay, null);
+  }
+
+  public DummyProcess(String zkConnectString,
+                      String clusterName,
+                      String instanceName,
+                      String clusterMangerType,
+                      String clusterViewFile,
+                      int delay,
+                      FilePropertyStore<ZNRecord> fileStore)
+  {
+    _zkConnectString = zkConnectString;
+    _clusterName = clusterName;
+    _instanceName = instanceName;
+    _clusterViewFile = clusterViewFile;
+    _clusterMangerType = clusterMangerType;
+    _transDelayInMs = delay > 0 ? delay : 0;
+    _fileStore = fileStore;
+  }
+
+  static void sleep(long transDelay)
+  {
+    try
+    {
+      if (transDelay > 0)
+      {
+        Thread.sleep(transDelay);
+      }
+    }
+    catch (InterruptedException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  public HelixManager start() throws Exception
+  {
+    HelixManager manager = null;
+    // zk cluster manager
+    if (_clusterMangerType.equalsIgnoreCase("zk"))
+    {
+      manager = HelixManagerFactory.getZKHelixManager(_clusterName,
+                                                          _instanceName,
+                                                          InstanceType.PARTICIPANT,
+                                                          _zkConnectString);
+    }
+    // static file cluster manager
+    else if (_clusterMangerType.equalsIgnoreCase("static-file"))
+    {
+      manager = HelixManagerFactory.getStaticFileHelixManager(_clusterName,
+                                                                  _instanceName,
+                                                                  InstanceType.PARTICIPANT,
+                                                                  _clusterViewFile);
+
+    }
+    // dynamic file cluster manager
+    else if (_clusterMangerType.equalsIgnoreCase("dynamic-file"))
+    {
+      manager = HelixManagerFactory.getDynamicFileHelixManager(_clusterName,
+                                                                   _instanceName,
+                                                                   InstanceType.PARTICIPANT,
+                                                                   _fileStore);
+    }
+    else
+    {
+      throw new IllegalArgumentException("Unsupported cluster manager type:" + _clusterMangerType);
+    }
+
+    stateModelFactory = new DummyStateModelFactory(_transDelayInMs);
+    DummyLeaderStandbyStateModelFactory stateModelFactory1 = new DummyLeaderStandbyStateModelFactory(_transDelayInMs);
+    DummyOnlineOfflineStateModelFactory stateModelFactory2 = new DummyOnlineOfflineStateModelFactory(_transDelayInMs);
+//    genericStateMachineHandler = new StateMachineEngine();
+    StateMachineEngine stateMach = manager.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory1);
+    stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
+
+    manager.connect();
+//    manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), genericStateMachineHandler);
+    return manager;
+  }
+
+  public static class DummyStateModelFactory extends StateModelFactory<DummyStateModel>
+  {
+    int _delay;
+
+    public DummyStateModelFactory(int delay)
+    {
+      _delay = delay;
+    }
+
+    @Override
+    public DummyStateModel createNewStateModel(String stateUnitKey)
+    {
+      DummyStateModel model = new DummyStateModel();
+      model.setDelay(_delay);
+      return model;
+    }
+  }
+
+  public static class DummyLeaderStandbyStateModelFactory extends StateModelFactory<DummyLeaderStandbyStateModel>
+  {
+    int _delay;
+
+    public DummyLeaderStandbyStateModelFactory(int delay)
+    {
+      _delay = delay;
+    }
+
+    @Override
+    public DummyLeaderStandbyStateModel createNewStateModel(String stateUnitKey)
+    {
+      DummyLeaderStandbyStateModel model = new DummyLeaderStandbyStateModel();
+      model.setDelay(_delay);
+      return model;
+    }
+  }
+
+  public static class DummyOnlineOfflineStateModelFactory extends StateModelFactory<DummyOnlineOfflineStateModel>
+  {
+    int _delay;
+
+    public DummyOnlineOfflineStateModelFactory(int delay)
+    {
+      _delay = delay;
+    }
+
+    @Override
+    public DummyOnlineOfflineStateModel createNewStateModel(String stateUnitKey)
+    {
+      DummyOnlineOfflineStateModel model = new DummyOnlineOfflineStateModel();
+      model.setDelay(_delay);
+      return model;
+    }
+  }
+  public static class DummyStateModel extends StateModel
+  {
+    int _transDelay = 0;
+
+    public void setDelay(int delay)
+    {
+      _transDelay = delay > 0 ? delay : 0;
+    }
+
+    public void onBecomeSlaveFromOffline(Message message,
+        NotificationContext context)
+    {
+      String db = message.getPartitionName();
+      String instanceName = context.getManager().getInstanceName();
+      DummyProcess.sleep(_transDelay);
+
+      logger.info("DummyStateModel.onBecomeSlaveFromOffline(), instance:" + instanceName
+                         + ", db:" + db);
+    }
+
+    public void onBecomeSlaveFromMaster(Message message,
+        NotificationContext context)
+    {
+      DummyProcess.sleep(_transDelay);
+
+      logger.info("DummyStateModel.onBecomeSlaveFromMaster()");
+
+    }
+
+    public void onBecomeMasterFromSlave(Message message,
+        NotificationContext context)
+    {
+      DummyProcess.sleep(_transDelay);
+
+      logger.info("DummyStateModel.onBecomeMasterFromSlave()");
+
+    }
+
+    public void onBecomeOfflineFromSlave(Message message,
+        NotificationContext context)
+    {
+      DummyProcess.sleep(_transDelay);
+
+      logger.info("DummyStateModel.onBecomeOfflineFromSlave()");
+
+    }
+
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+    {
+      DummyProcess.sleep(_transDelay);
+
+      logger.info("DummyStateModel.onBecomeDroppedFromOffline()");
+
+    }
+  }
+
+
+  public static class DummyOnlineOfflineStateModel extends StateModel
+  {
+    int _transDelay = 0;
+
+    public void setDelay(int delay)
+    {
+      _transDelay = delay > 0 ? delay : 0;
+    }
+
+    public void onBecomeOnlineFromOffline(Message message,
+        NotificationContext context)
+    {
+      String db = message.getPartitionName();
+      String instanceName = context.getManager().getInstanceName();
+      DummyProcess.sleep(_transDelay);
+
+      logger.info("DummyStateModel.onBecomeOnlineFromOffline(), instance:" + instanceName
+                         + ", db:" + db);
+    }
+
+    public void onBecomeOfflineFromOnline(Message message,
+        NotificationContext context)
+    {
+      DummyProcess.sleep(_transDelay);
+
+      logger.info("DummyStateModel.onBecomeOfflineFromOnline()");
+
+    }
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+    {
+      DummyProcess.sleep(_transDelay);
+
+      logger.info("DummyStateModel.onBecomeDroppedFromOffline()");
+
+    }
+  }
+
+  public static class DummyLeaderStandbyStateModel extends StateModel
+  {
+    int _transDelay = 0;
+
+    public void setDelay(int delay)
+    {
+      _transDelay = delay > 0 ? delay : 0;
+    }
+
+    public void onBecomeLeaderFromStandby(Message message,
+        NotificationContext context)
+    {
+      String db = message.getPartitionName();
+      String instanceName = context.getManager().getInstanceName();
+      DummyProcess.sleep(_transDelay);
+      logger.info("DummyLeaderStandbyStateModel.onBecomeLeaderFromStandby(), instance:" + instanceName
+                         + ", db:" + db);
+    }
+
+    public void onBecomeStandbyFromLeader(Message message,
+        NotificationContext context)
+    {
+      DummyProcess.sleep(_transDelay);
+
+      logger.info("DummyLeaderStandbyStateModel.onBecomeStandbyFromLeader()");
+
+    }
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+    {
+      DummyProcess.sleep(_transDelay);
+
+      logger.info("DummyLeaderStandbyStateModel.onBecomeDroppedFromOffline()");
+
+    }
+
+    public void onBecomeStandbyFromOffline(Message message, NotificationContext context)
+    {
+      DummyProcess.sleep(_transDelay);
+
+      logger.info("DummyLeaderStandbyStateModel.onBecomeStandbyFromOffline()");
+
+    }
+
+    public void onBecomeOfflineFromStandby(Message message, NotificationContext context)
+    {
+      DummyProcess.sleep(_transDelay);
+
+      logger.info("DummyLeaderStandbyStateModel.onBecomeOfflineFromStandby()");
+
+    }
+  }
+
+  // TODO hack OptionBuilder is not thread safe
+  @SuppressWarnings("static-access")
+  synchronized private static Options constructCommandLineOptions()
+  {
+    Option helpOption = OptionBuilder.withLongOpt(help)
+        .withDescription("Prints command-line options info").create();
+
+    Option clusterOption = OptionBuilder.withLongOpt(cluster)
+        .withDescription("Provide cluster name").create();
+    clusterOption.setArgs(1);
+    clusterOption.setRequired(true);
+    clusterOption.setArgName("Cluster name (Required)");
+
+    Option hostOption = OptionBuilder.withLongOpt(hostAddress)
+        .withDescription("Provide host name").create();
+    hostOption.setArgs(1);
+    hostOption.setRequired(true);
+    hostOption.setArgName("Host name (Required)");
+
+    Option portOption = OptionBuilder.withLongOpt(hostPort)
+        .withDescription("Provide host port").create();
+    portOption.setArgs(1);
+    portOption.setRequired(true);
+    portOption.setArgName("Host port (Required)");
+
+    Option cmTypeOption = OptionBuilder.withLongOpt(helixManagerType)
+        .withDescription("Provide cluster manager type (e.g. 'zk', 'static-file', or 'dynamic-file'").create();
+    cmTypeOption.setArgs(1);
+    cmTypeOption.setRequired(true);
+    cmTypeOption.setArgName("Clsuter manager type (e.g. 'zk', 'static-file', or 'dynamic-file') (Required)");
+
+    // add an option group including either --zkSvr or --clusterViewFile
+    Option fileOption = OptionBuilder.withLongOpt(clusterViewFile)
+        .withDescription("Provide a cluster-view file for static-file based cluster manager").create();
+    fileOption.setArgs(1);
+    fileOption.setRequired(true);
+    fileOption.setArgName("Cluster-view file (Required for static-file based cluster manager)");
+
+    Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
+      .withDescription("Provide zookeeper address").create();
+    zkServerOption.setArgs(1);
+    zkServerOption.setRequired(true);
+    zkServerOption.setArgName("ZookeeperServerAddress(Required for zk-based cluster manager)");
+
+//    Option rootNsOption = OptionBuilder.withLongOpt(rootNamespace)
+//        .withDescription("Provide root namespace for dynamic-file based cluster manager").create();
+//    rootNsOption.setArgs(1);
+//    rootNsOption.setRequired(true);
+//    rootNsOption.setArgName("Root namespace (Required for dynamic-file based cluster manager)");
+
+
+    Option transDelayOption = OptionBuilder.withLongOpt(transDelay)
+        .withDescription("Provide state trans delay").create();
+    transDelayOption.setArgs(1);
+    transDelayOption.setRequired(false);
+    transDelayOption.setArgName("Delay time in state transition, in MS");
+
+    OptionGroup optionGroup = new OptionGroup();
+    optionGroup.addOption(zkServerOption);
+    optionGroup.addOption(fileOption);
+//    optionGroup.addOption(rootNsOption);
+
+    Options options = new Options();
+    options.addOption(helpOption);
+    options.addOption(clusterOption);
+    options.addOption(hostOption);
+    options.addOption(portOption);
+    options.addOption(transDelayOption);
+    options.addOption(cmTypeOption);
+
+    options.addOptionGroup(optionGroup);
+
+    return options;
+  }
+
+  public static void printUsage(Options cliOptions)
+  {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.printHelp("java " + DummyProcess.class.getName(), cliOptions);
+  }
+
+  public static CommandLine processCommandLineArgs(String[] cliArgs)
+      throws Exception
+  {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    // CommandLine cmd = null;
+
+    try
+    {
+      return cliParser.parse(cliOptions, cliArgs);
+    } catch (ParseException pe)
+    {
+      System.err
+          .println("CommandLineClient: failed to parse command-line options: "
+              + pe.toString());
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+    return null;
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    String cmType = "zk";
+    String zkConnectString = "localhost:2181";
+    String clusterName = "testCluster";
+    String instanceName = "localhost_8900";
+    String cvFileStr = null;
+//    String rootNs = null;
+    int delay = 0;
+
+    if (args.length > 0)
+    {
+      CommandLine cmd = processCommandLineArgs(args);
+      zkConnectString = cmd.getOptionValue(zkServer);
+      clusterName = cmd.getOptionValue(cluster);
+
+      String host = cmd.getOptionValue(hostAddress);
+      String portString = cmd.getOptionValue(hostPort);
+      int port = Integer.parseInt(portString);
+      instanceName = host + "_" + port;
+      cmType = cmd.getOptionValue(helixManagerType);
+
+      if (cmd.hasOption(clusterViewFile))
+      {
+        cvFileStr = cmd.getOptionValue(clusterViewFile);
+        if (!new File(cvFileStr).exists())
+        {
+          throw new IllegalArgumentException("Cluster-view file:" + cvFileStr
+                                             + " does NOT exist");
+        }
+      }
+
+//      if (cmd.hasOption(rootNamespace))
+//      {
+//        rootNs = cmd.getOptionValue(rootNamespace);
+//      }
+
+      if (cmd.hasOption(transDelay))
+      {
+        try
+        {
+          delay = Integer.parseInt(cmd.getOptionValue(transDelay));
+          if (delay < 0)
+          {
+            throw new Exception("delay must be positive");
+          }
+        } catch (Exception e)
+        {
+          e.printStackTrace();
+          delay = 0;
+        }
+      }
+    }
+    // Espresso_driver.py will consume this
+    logger.info("Dummy process started, instanceName:" + instanceName);
+
+    DummyProcess process = new DummyProcess(zkConnectString,
+                                            clusterName,
+                                            instanceName,
+                                            cmType,
+                                            cvFileStr,
+                                            delay);
+    HelixManager manager = process.start();
+
+    try
+    {
+      Thread.currentThread().join();
+    }
+    catch (InterruptedException e)
+    {
+      // ClusterManagerFactory.disconnectManagers(instanceName);
+      logger.info("participant:" + instanceName + ", " +
+                   Thread.currentThread().getName() + " interrupted");
+//      if (manager != null)
+//      {
+//        manager.disconnect();
+//      }
+    }
+    finally
+    {
+      if (manager != null)
+      {
+        manager.disconnect();
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModel.java
new file mode 100644
index 0000000..4a29f16
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModel.java
@@ -0,0 +1,247 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.healthcheck.StatHealthReportProvider;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.mock.consumer.ConsumerAdapter;
+import org.apache.helix.mock.consumer.RelayConfig;
+import org.apache.helix.mock.consumer.RelayConsumer;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.log4j.Logger;
+
+
+public class HealthCheckStateModel extends StateModel
+{
+
+  // private Map<Integer, RelayConsumer> relayConsumersMap;
+  private RelayConsumer consumer = null;
+  private RelayConfig relayConfig;
+  private StorageAdapter storage;
+  private StatHealthReportProvider _provider;
+  //private StatReporterThread _reporterThread;
+  private int _reportInterval;
+  private Map<String, Vector<String>> _reportValues;
+  private CountDownLatch _countdown;
+
+  private static Logger logger = Logger.getLogger(HealthCheckStateModel.class);
+
+  public HealthCheckStateModel(String stateUnitKey, StorageAdapter storageAdapter, StatHealthReportProvider provider,
+		  int reportInterval, Map<String, Vector<String>> reportValues, CountDownLatch countdown)
+  {
+    // relayConsumersMap = new HashMap<Integer,RelayConsumer>();
+    storage = storageAdapter;
+   //_reporterThread = new StatReporterThread(provider, reportInterval, reportValues, countdown);
+    // this.consumerAdapter = consumerAdapter;
+   _provider = provider;
+   _reportInterval = reportInterval;
+   _reportValues = reportValues;
+   _countdown = countdown;
+  }
+
+  public RelayConfig getRelayConfig()
+  {
+    return relayConfig;
+  }
+
+  public void setRelayConfig(RelayConfig relayConfig)
+  {
+    this.relayConfig = relayConfig;
+  }
+
+  void checkDebug(Message task) throws Exception
+  {
+    // For debugging purposes
+    if ((Boolean) task.getDebug() == true)
+    {
+      throw new Exception("Exception for debug");
+    }
+  }
+
+  // @transition(to='to',from='from',blah blah..)
+  public void onBecomeSlaveFromOffline(Message task, NotificationContext context)
+      throws Exception
+  {
+
+    logger.info("Becoming slave from offline");
+
+    checkDebug(task);
+
+    String partition = (String) task.getPartitionName();
+    String[] pdata = partition.split("\\.");
+    String dbName = pdata[0];
+
+    // Initializations for the storage node to create right tables, indexes
+    // etc.
+    storage.init(partition);
+    storage.setPermissions(partition, "READONLY");
+
+    // start consuming from the relay
+    consumer = storage.getNewRelayConsumer(dbName, partition);
+    consumer.start();
+    // TODO: how do we know we are caught up?
+
+    logger.info("Became slave for partition " + partition);
+  }
+
+  // @transition(to='to',from='from',blah blah..)
+  public void onBecomeSlaveFromMaster(Message task, NotificationContext context)
+      throws Exception
+  {
+
+    logger.info("Becoming slave from master");
+
+    checkDebug(task);
+
+    String partition = (String) task.getPartitionName();
+    String[] pdata = partition.split("\\.");
+    String dbName = pdata[0];
+    storage.setPermissions(partition, "READONLY");
+    storage.waitForWrites(partition);
+
+    // start consuming from the relay
+    consumer = storage.getNewRelayConsumer(dbName, partition);
+    consumer.start();
+
+    logger.info("Becamse slave for partition " + partition);
+  }
+
+  // @transition(to='to',from='from',blah blah..)
+  public void onBecomeMasterFromSlave(Message task, NotificationContext context)
+      throws Exception
+  {
+    logger.info("Becoming master from slave");
+
+    checkDebug(task);
+
+    String partition = (String) task.getPartitionName();
+
+    // stop consumer and refetch from all so all changes are drained
+    consumer.flush(); // blocking call
+
+    // TODO: publish the hwm somewhere
+    long hwm = consumer.getHwm();
+    storage.setHwm(partition, hwm);
+    storage.removeConsumer(partition);
+    consumer = null;
+
+    // set generation in storage
+    Integer generationId = (Integer) task.getGeneration();
+    storage.setGeneration(partition, generationId);
+
+    storage.setPermissions(partition, "READWRITE");
+
+    String[] pdata = partition.split("\\.");
+    String dbName = pdata[0];
+    
+    HelixManager manager = context.getManager();
+    
+    //start the reporting thread
+    logger.debug("Starting stats reporting thread");
+    StatReporterThread reporterThread = new StatReporterThread(manager, _provider, dbName, partition, 
+    										_reportInterval, _reportValues, _countdown);
+    Thread t = new Thread(reporterThread);
+    t.run();
+    logger.info("Became master for partition " + partition);
+  }
+
+  // @transition(to='to',from='from',blah blah..)
+  public void onBecomeOfflineFromSlave(Message task, NotificationContext context)
+      throws Exception
+  {
+
+    logger.info("Becoming offline from slave");
+
+    checkDebug(task);
+
+    String partition = (String) task.getPartitionName();
+
+    consumer.stop();
+    storage.removeConsumer(partition);
+    consumer = null;
+
+    storage.setPermissions(partition, "OFFLINE");
+
+    logger.info("Became offline for partition " + partition);
+  }
+  
+  public static String formStatName(String dbName, String partitionName, String metricName)
+	{
+		String statName;
+		statName = "db"+dbName+".partition"+partitionName+"."+metricName;
+		return statName;	
+	}
+  
+  public class StatReporterThread implements Runnable
+  {
+	  private HelixManager _manager;
+	  private int _reportInterval;
+	  private Map<String, Vector<String>> _reportValues;
+	  private CountDownLatch _countdown;
+	  private StatHealthReportProvider _provider;
+	  private String _dbName;
+	  private String _partitionName;
+	  
+	public StatReporterThread(HelixManager manager, StatHealthReportProvider provider, String dbName, 
+			String partitionName, int reportInterval, 
+			Map<String,Vector<String>> reportValues, CountDownLatch countdown)
+	{
+		_manager = manager;
+		_reportInterval = reportInterval;
+		_reportValues = reportValues;
+		_countdown = countdown;
+		_provider = provider;
+		_dbName = dbName;
+		_partitionName = partitionName;
+	}
+	  
+	@Override
+	public void run() 
+	{
+		boolean doneWithStats = false;
+		while (!doneWithStats) {
+			doneWithStats = true;
+			try {
+				Thread.sleep(_reportInterval);
+			} catch (InterruptedException e) {
+				logger.error("Unable to sleep, stats not getting staggered, "+e);
+			}
+			for (String metricName : _reportValues.keySet()) {
+				Vector<String> currValues = _reportValues.get(metricName);
+				if (currValues.size() > 0) {
+					doneWithStats = false;
+					String statName = formStatName(_dbName, _partitionName, metricName);
+					String currValue = currValues.remove(0);
+					Long currTimestamp = System.currentTimeMillis();
+					_provider.writeStat(statName, currValue, String.valueOf(currTimestamp));
+				}
+			}
+			_manager.getHealthReportCollector().transmitHealthReports();
+		}
+
+		_countdown.countDown();
+	}
+	  
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModelFactory.java
new file mode 100644
index 0000000..15336ef
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/HealthCheckStateModelFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.log4j.Logger;
+
+
+public class HealthCheckStateModelFactory extends StateModelFactory
+{
+  private static Logger logger = Logger
+      .getLogger(HealthCheckStateModelFactory.class);
+
+  private StorageAdapter storageAdapter;
+
+  // private ConsumerAdapter consumerAdapter;
+
+  public HealthCheckStateModelFactory(StorageAdapter storage)
+  {
+    storageAdapter = storage;
+  }
+
+  HealthCheckStateModel getStateModelForPartition(Integer partition)
+  {
+    return null;
+  }
+
+  @Override
+  public StateModel createNewStateModel(String stateUnitKey)
+  {
+    logger.info("HealthCheckStateModelFactory.getStateModel()");
+    //TODO: fix these parameters
+    return new HealthCheckStateModel(stateUnitKey, storageAdapter, null, 0, null, null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/MockEspressoHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/MockEspressoHealthReportProvider.java b/helix-core/src/test/java/org/apache/helix/mock/storage/MockEspressoHealthReportProvider.java
new file mode 100644
index 0000000..fb97e7f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/MockEspressoHealthReportProvider.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.healthcheck.HealthReportProvider;
+
+
+public class MockEspressoHealthReportProvider extends HealthReportProvider {
+
+	private final String _reportName = "RestQueryStats";
+	private HashMap<String, Map<String,String>> _statMap;
+	private final String DB_NAME = "DBName";
+	
+	public MockEspressoHealthReportProvider()
+	{
+		super();
+		_statMap = new HashMap<String, Map<String,String>>();
+	}
+	
+	public String buildMapKey(String dbName)
+	{
+		return _reportName+"@"+DB_NAME+"="+dbName;
+	}
+	
+	public void setStat(String dbName, String statName, String statVal)
+	{
+		String currTime = String.valueOf(System.currentTimeMillis());
+		setStat(dbName, statName, statVal, currTime);
+	}
+	
+	/*
+	 * This version takes a fixed timestamp to ease with testing
+	 */
+	public void setStat(String dbName, String statName, String statVal, String timestamp)
+	{
+		String key = buildMapKey(dbName);
+		Map<String, String> dbStatMap = _statMap.get(key);
+		if (dbStatMap == null) {
+			dbStatMap = new HashMap<String,String>();
+			_statMap.put(key, dbStatMap);
+		}
+		dbStatMap.put(statName,  statVal);
+		dbStatMap.put(StatsHolder.TIMESTAMP_NAME, timestamp);
+	}
+	
+	@Override
+	public Map<String, String> getRecentHealthReport() {
+		return null;
+	}
+
+	@Override
+	public Map<String, Map<String, String>> getRecentPartitionHealthReport() {
+		return _statMap;
+	}
+	
+	@Override
+	public void resetStats() {
+	_statMap.clear();
+	}
+	
+	public String getReportName() 
+	{
+		return _reportName;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/MockHealthReportParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/MockHealthReportParticipant.java b/helix-core/src/test/java/org/apache/helix/mock/storage/MockHealthReportParticipant.java
new file mode 100644
index 0000000..76ac216
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/MockHealthReportParticipant.java
@@ -0,0 +1,274 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.healthcheck.HealthReportProvider;
+import org.apache.log4j.Logger;
+
+
+public class MockHealthReportParticipant
+{
+  private static final Logger LOG      =
+                                           Logger.getLogger(MockHealthReportParticipant.class);
+  public static final String  zkServer = "zkSvr";
+  public static final String  cluster  = "cluster";
+  public static final String  host     = "host";
+  public static final String  port     = "port";
+  public static final String  help     = "help";
+
+  static class MockHealthReportProvider extends HealthReportProvider
+  {
+    private final String                           _reportName = "MockRestQueryStats";
+    private final Map<String, Map<String, String>> _mockHealthReport;
+
+    public MockHealthReportProvider()
+    {
+      _mockHealthReport = new HashMap<String, Map<String, String>>();
+
+      Map<String, String> reportMap = new HashMap<String, String>();
+      _mockHealthReport.put("MockRestQueryStats@DBName=BizProfile", reportMap);
+
+      reportMap.put("MeanMysqlLatency", "2.132700625");
+      reportMap.put("95PercentileLatencyLucene", "108.40825525");
+      reportMap.put("99PercentileLatencyMysql", "9.369827");
+      reportMap.put("99PercentileLatencyServer", "167.714208");
+      reportMap.put("95PercentileLatencyMysqlPool", "8.03621375");
+      reportMap.put("95PercentileLatencyServer", "164.68374265");
+      reportMap.put("MinLuceneLatency", "1.765908");
+      reportMap.put("MaxServerLatency", "167.714208");
+      reportMap.put("MeanLuceneLatency", "16.107599458333336");
+      reportMap.put("CollectorName", "RestQueryStats");
+      reportMap.put("MeanLucenePoolLatency", "8.120545333333332");
+      reportMap.put("99PercentileLatencyLucenePool", "65.930564");
+      reportMap.put("MinServerLatency", "0.425272");
+      reportMap.put("IndexStoreMismatchCount", "0");
+      reportMap.put("ErrorCount", "0");
+      reportMap.put("MeanMysqlPoolLatency", "1.0704102916666667");
+      reportMap.put("MinLucenePoolLatency", "0.008189");
+      reportMap.put("MinMysqlLatency", "0.709691");
+      reportMap.put("MaxMysqlPoolLatency", "8.606973");
+      reportMap.put("99PercentileLatencyMysqlPool", "8.606973");
+      reportMap.put("MinMysqlPoolLatency", "0.091883");
+      reportMap.put("MaxLucenePoolLatency", "65.930564");
+      reportMap.put("99PercentileLatencyLucene", "111.78799");
+      reportMap.put("MaxMysqlLatency", "9.369827");
+      reportMap.put("TimeStamp", "1332895048143");
+      reportMap.put("MeanConcurrencyLevel", "1.9");
+      reportMap.put("95PercentileLatencyMysql", "8.96594875");
+      reportMap.put("QueryStartCount", "0");
+      reportMap.put("95PercentileLatencyLucenePool", "63.518656500000006");
+      reportMap.put("MeanServerLatency", "39.5451532");
+      reportMap.put("MaxLuceneLatency", "111.78799");
+      reportMap.put("QuerySuccessCount", "0");
+    }
+
+    @Override
+    public Map<String, String> getRecentHealthReport()
+    {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public void resetStats()
+    {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public Map<String, Map<String, String>> getRecentPartitionHealthReport()
+    {
+      // tweak: randomly change the last digit
+      for (String key1 : _mockHealthReport.keySet())
+      {
+        Map<String, String> reportMap = _mockHealthReport.get(key1);
+        for (String key2 : reportMap.keySet())
+        {
+          String value = reportMap.get(key2);
+          String lastDigit = "" + new Random().nextInt(10);
+          value = value.substring(0, value.length() - 1) + lastDigit;
+          reportMap.put(key2, value);
+        }
+      }
+
+      return _mockHealthReport;
+    }
+
+    @Override
+    public String getReportName()
+    {
+      return _reportName;
+    }
+  }
+
+  static class MockHealthReportJob implements MockJobIntf
+  {
+
+    @Override
+    public void doPreConnectJob(HelixManager manager)
+    {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void doPostConnectJob(HelixManager manager)
+    {
+      // TODO Auto-generated method stub
+      manager.getHealthReportCollector()
+             .addHealthReportProvider(new MockHealthReportProvider());
+
+//      // set property store path for perf test
+//      final String setPath = "/TEST_PERF/set";
+//      final String updatePath = "/TEST_PERF/update";
+//      manager.getHelixPropertyStore().create(setPath, new ZNRecord(setPath), BaseDataAccessor.Option.PERSISTENT);
+//      manager.getHelixPropertyStore().set(updatePath, new ZNRecord(updatePath), BaseDataAccessor.Option.PERSISTENT);
+    }
+
+  }
+
+  // hack OptionBuilder is not thread safe
+  @SuppressWarnings("static-access")
+  synchronized private static Options constructCommandLineOptions()
+  {
+    Option helpOption =
+        OptionBuilder.withLongOpt(help)
+                     .withDescription("Prints command-line options info")
+                     .create();
+
+    Option clusterOption =
+        OptionBuilder.withLongOpt(cluster)
+                     .withDescription("Provide cluster name")
+                     .create();
+    clusterOption.setArgs(1);
+    clusterOption.setRequired(true);
+    clusterOption.setArgName("Cluster name (Required)");
+
+    Option hostOption =
+        OptionBuilder.withLongOpt(host).withDescription("Provide host name").create();
+    hostOption.setArgs(1);
+    hostOption.setRequired(true);
+    hostOption.setArgName("Host name (Required)");
+
+    Option portOption =
+        OptionBuilder.withLongOpt(port).withDescription("Provide host port").create();
+    portOption.setArgs(1);
+    portOption.setRequired(true);
+    portOption.setArgName("Host port (Required)");
+
+    Option zkServerOption =
+        OptionBuilder.withLongOpt(zkServer)
+                     .withDescription("Provide zookeeper address")
+                     .create();
+    zkServerOption.setArgs(1);
+    zkServerOption.setRequired(true);
+    zkServerOption.setArgName("Zookeeper server address(Required)");
+
+    Options options = new Options();
+    options.addOption(helpOption);
+    options.addOption(clusterOption);
+    options.addOption(hostOption);
+    options.addOption(portOption);
+    options.addOption(zkServerOption);
+
+    return options;
+  }
+
+  public static void printUsage(Options cliOptions)
+  {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.printHelp("java " + MockHealthReportParticipant.class.getName(),
+                            cliOptions);
+  }
+
+  public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception
+  {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+
+    try
+    {
+
+      return cliParser.parse(cliOptions, cliArgs);
+    }
+    catch (ParseException pe)
+    {
+      System.err.println("CommandLineClient: failed to parse command-line options: "
+          + pe.toString());
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+    return null;
+  }
+
+  // NOT working for kill -9, working for kill -2/-15
+  static class MockHealthReportParticipantShutdownHook extends Thread
+  {
+    final MockParticipant _participant;
+
+    MockHealthReportParticipantShutdownHook(MockParticipant participant)
+    {
+      _participant = participant;
+    }
+
+    @Override
+    public void run()
+    {
+      LOG.info("MockHealthReportParticipantShutdownHook invoked");
+      _participant.syncStop();
+    }
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    CommandLine cmd = processCommandLineArgs(args);
+    String zkConnectStr = cmd.getOptionValue(zkServer);
+    String clusterName = cmd.getOptionValue(cluster);
+    String hostStr = cmd.getOptionValue(host);
+    String portStr = cmd.getOptionValue(port);
+
+    String instanceName = hostStr + "_" + portStr;
+
+    MockParticipant participant =
+        new MockParticipant(clusterName,
+                            instanceName,
+                            zkConnectStr,
+                            null,   // new StoreAccessDiffNodeTransition(), // new StoreAccessOneNodeTransition(),
+                            new MockHealthReportJob());
+    Runtime.getRuntime()
+           .addShutdownHook(new MockHealthReportParticipantShutdownHook(participant));
+
+    // Espresso_driver.py will consume this
+    System.out.println("MockHealthReportParticipant process started, instanceName: "
+        + instanceName);
+
+    participant.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/MockJobIntf.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/MockJobIntf.java b/helix-core/src/test/java/org/apache/helix/mock/storage/MockJobIntf.java
new file mode 100644
index 0000000..b400ab9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/MockJobIntf.java
@@ -0,0 +1,24 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import org.apache.helix.HelixManager;
+
+public interface MockJobIntf
+{
+  public void doPreConnectJob(HelixManager manager);
+  public void doPostConnectJob(HelixManager manager);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/MockParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/MockParticipant.java b/helix-core/src/test/java/org/apache/helix/mock/storage/MockParticipant.java
new file mode 100644
index 0000000..bfffcda
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/MockParticipant.java
@@ -0,0 +1,615 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.mock.storage.DummyProcess.DummyLeaderStandbyStateModelFactory;
+import org.apache.helix.mock.storage.DummyProcess.DummyOnlineOfflineStateModelFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.log4j.Logger;
+
+
+public class MockParticipant extends Thread
+{
+  private static Logger           LOG                      =
+                                                               Logger.getLogger(MockParticipant.class);
+  private final String            _clusterName;
+  private final String            _instanceName;
+  // private final String _zkAddr;
+
+  private final CountDownLatch    _startCountDown          = new CountDownLatch(1);
+  private final CountDownLatch    _stopCountDown           = new CountDownLatch(1);
+  private final CountDownLatch    _waitStopFinishCountDown = new CountDownLatch(1);
+
+  private final HelixManager      _manager;
+  private final StateModelFactory _msModelFactory;
+  private final MockJobIntf       _job;
+
+  // mock master-slave state model
+  @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" })
+  public static class MockMSStateModel extends StateModel
+  {
+    protected MockTransition _transition;
+
+    public MockMSStateModel(MockTransition transition)
+    {
+      _transition = transition;
+    }
+
+    public void setTransition(MockTransition transition)
+    {
+      _transition = transition;
+    }
+
+    @Transition(to = "SLAVE", from = "OFFLINE")
+    public void onBecomeSlaveFromOffline(Message message, NotificationContext context) throws InterruptedException
+    {
+      LOG.info("Become SLAVE from OFFLINE");
+      if (_transition != null)
+      {
+        _transition.doTransition(message, context);
+
+      }
+    }
+
+    @Transition(to = "MASTER", from = "SLAVE")
+    public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException
+    {
+      LOG.info("Become MASTER from SLAVE");
+      if (_transition != null)
+      {
+        _transition.doTransition(message, context);
+      }
+    }
+
+    @Transition(to = "SLAVE", from = "MASTER")
+    public void onBecomeSlaveFromMaster(Message message, NotificationContext context) throws InterruptedException
+    {
+      LOG.info("Become SLAVE from MASTER");
+      if (_transition != null)
+      {
+        _transition.doTransition(message, context);
+      }
+    }
+
+    @Transition(to = "OFFLINE", from = "SLAVE")
+    public void onBecomeOfflineFromSlave(Message message, NotificationContext context) throws InterruptedException
+    {
+      LOG.info("Become OFFLINE from SLAVE");
+      if (_transition != null)
+      {
+        _transition.doTransition(message, context);
+      }
+    }
+
+    @Transition(to = "DROPPED", from = "OFFLINE")
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) throws InterruptedException
+    {
+      LOG.info("Become DROPPED from OFFLINE");
+      if (_transition != null)
+      {
+        _transition.doTransition(message, context);
+      }
+    }
+
+    @Transition(to = "OFFLINE", from = "ERROR")
+    public void onBecomeOfflineFromError(Message message, NotificationContext context) throws InterruptedException
+    {
+      LOG.info("Become OFFLINE from ERROR");
+      // System.err.println("Become OFFLINE from ERROR");
+      if (_transition != null)
+      {
+        _transition.doTransition(message, context);
+      }
+    }
+
+    @Override
+    public void reset()
+    {
+      LOG.info("Default MockMSStateModel.reset() invoked");
+      if (_transition != null)
+      {
+        _transition.doReset();
+      }
+    }
+  }
+
+  // mock master slave state model factory
+  public static class MockMSModelFactory extends StateModelFactory<MockMSStateModel>
+  {
+    private final MockTransition _transition;
+
+    public MockMSModelFactory()
+    {
+      this(null);
+    }
+
+    public MockMSModelFactory(MockTransition transition)
+    {
+      _transition = transition;
+    }
+
+    public void setTrasition(MockTransition transition)
+    {
+      Map<String, MockMSStateModel> stateModelMap = getStateModelMap();
+      for (MockMSStateModel stateModel : stateModelMap.values())
+      {
+        stateModel.setTransition(transition);
+      }
+    }
+
+    @Override
+    public MockMSStateModel createNewStateModel(String partitionKey)
+    {
+      MockMSStateModel model = new MockMSStateModel(_transition);
+
+      return model;
+    }
+  }
+
+  // mock STORAGE_DEFAULT_SM_SCHEMATA state model
+  @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "DROPPED", "ERROR" })
+  public class MockSchemataStateModel extends StateModel
+  {
+    @Transition(to = "MASTER", from = "OFFLINE")
+    public void onBecomeMasterFromOffline(Message message, NotificationContext context)
+    {
+      LOG.info("Become MASTER from OFFLINE");
+    }
+
+    @Transition(to = "OFFLINE", from = "MASTER")
+    public void onBecomeOfflineFromMaster(Message message, NotificationContext context)
+    {
+      LOG.info("Become OFFLINE from MASTER");
+    }
+
+    @Transition(to = "DROPPED", from = "OFFLINE")
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+    {
+      LOG.info("Become DROPPED from OFFLINE");
+    }
+
+    @Transition(to = "OFFLINE", from = "ERROR")
+    public void onBecomeOfflineFromError(Message message, NotificationContext context)
+    {
+      LOG.info("Become OFFLINE from ERROR");
+    }
+  }
+
+  // mock Bootstrap state model
+  @StateModelInfo(initialState = "OFFLINE", states = { "ONLINE", "BOOTSTRAP", "OFFLINE",
+      "IDLE" })
+  public static class MockBootstrapStateModel extends StateModel
+  {
+    // Overwrite the default value of intial state
+    MockBootstrapStateModel()
+    {
+      _currentState = "IDLE";
+    }
+
+    @Transition(to = "OFFLINE", from = "IDLE")
+    public void onBecomeOfflineFromIdle(Message message, NotificationContext context)
+    {
+      LOG.info("Become OFFLINE from IDLE");
+    }
+
+    @Transition(to = "BOOTSTRAP", from = "OFFLINE")
+    public void onBecomeBootstrapFromOffline(Message message, NotificationContext context)
+    {
+      LOG.info("Become BOOTSTRAP from OFFLINE");
+    }
+
+    @Transition(to = "ONLINE", from = "BOOSTRAP")
+    public void onBecomeOnlineFromBootstrap(Message message, NotificationContext context)
+    {
+      LOG.info("Become ONLINE from BOOTSTRAP");
+    }
+
+    @Transition(to = "OFFLINE", from = "ONLINE")
+    public void onBecomeOfflineFromOnline(Message message, NotificationContext context)
+    {
+      LOG.info("Become OFFLINE from ONLINE");
+    }
+  }
+
+  // mock STORAGE_DEFAULT_SM_SCHEMATA state model factory
+  public class MockSchemataModelFactory extends StateModelFactory<MockSchemataStateModel>
+  {
+    @Override
+    public MockSchemataStateModel createNewStateModel(String partitionKey)
+    {
+      MockSchemataStateModel model = new MockSchemataStateModel();
+      return model;
+    }
+  }
+
+  // mock Bootstrap state model factory
+  public static class MockBootstrapModelFactory extends
+      StateModelFactory<MockBootstrapStateModel>
+  {
+    @Override
+    public MockBootstrapStateModel createNewStateModel(String partitionKey)
+    {
+      MockBootstrapStateModel model = new MockBootstrapStateModel();
+      return model;
+    }
+  }
+
+  // simulate error transition
+  public static class ErrTransition extends MockTransition
+  {
+    private final Map<String, Set<String>> _errPartitions;
+
+    public ErrTransition(Map<String, Set<String>> errPartitions)
+    {
+      if (errPartitions != null)
+      {
+        // change key to upper case
+        _errPartitions = new HashMap<String, Set<String>>();
+        for (String key : errPartitions.keySet())
+        {
+          String upperKey = key.toUpperCase();
+          _errPartitions.put(upperKey, errPartitions.get(key));
+        }
+      }
+      else
+      {
+        _errPartitions = Collections.emptyMap();
+      }
+    }
+
+    @Override
+    public void doTransition(Message message, NotificationContext context)
+    {
+      String fromState = message.getFromState();
+      String toState = message.getToState();
+      String partition = message.getPartitionName();
+
+      String key = (fromState + "-" + toState).toUpperCase();
+      if (_errPartitions.containsKey(key) && _errPartitions.get(key).contains(partition))
+      {
+        String errMsg =
+            "IGNORABLE: test throw exception for " + partition + " transit from "
+                + fromState + " to " + toState;
+        throw new RuntimeException(errMsg);
+      }
+    }
+  }
+
+  // simulate long transition
+  public static class SleepTransition extends MockTransition
+  {
+    private final long _delay;
+
+    public SleepTransition(long delay)
+    {
+      _delay = delay > 0 ? delay : 0;
+    }
+
+    @Override
+    public void doTransition(Message message, NotificationContext context) throws InterruptedException
+    {
+      Thread.sleep(_delay);
+
+    }
+  }
+
+  // simulate access property store and update one znode
+  public static class StoreAccessOneNodeTransition extends MockTransition
+  {
+    @Override
+    public void doTransition(Message message, NotificationContext context) throws InterruptedException
+    {
+      HelixManager manager = context.getManager();
+      ZkHelixPropertyStore<ZNRecord> store = manager.getHelixPropertyStore();
+      final String setPath = "/TEST_PERF/set";
+      final String updatePath = "/TEST_PERF/update";
+      final String key = message.getPartitionName();
+      try
+      {
+        // get/set once
+        ZNRecord record = null;
+        try
+        {
+          record = store.get(setPath, null, 0);
+        }
+        catch (ZkNoNodeException e)
+        {
+          record = new ZNRecord(setPath);
+        }
+        record.setSimpleField("setTimestamp", "" + System.currentTimeMillis());
+        store.set(setPath, record, AccessOption.PERSISTENT);
+
+        // update once
+        store.update(updatePath, new DataUpdater<ZNRecord>()
+        {
+
+          @Override
+          public ZNRecord update(ZNRecord currentData)
+          {
+            if (currentData == null)
+            {
+              currentData = new ZNRecord(updatePath);
+            }
+            currentData.setSimpleField(key, "" + System.currentTimeMillis());
+
+            return currentData;
+          }
+
+        }, AccessOption.PERSISTENT);
+      }
+      catch (Exception e)
+      {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+
+    }
+  }
+
+  // simulate access property store and update different znodes
+  public static class StoreAccessDiffNodeTransition extends MockTransition
+  {
+    @Override
+    public void doTransition(Message message, NotificationContext context) throws InterruptedException
+    {
+      HelixManager manager = context.getManager();
+      ZkHelixPropertyStore<ZNRecord> store = manager.getHelixPropertyStore();
+      final String setPath = "/TEST_PERF/set/" + message.getPartitionName();
+      final String updatePath = "/TEST_PERF/update/" + message.getPartitionName();
+      // final String key = message.getPartitionName();
+      try
+      {
+        // get/set once
+        ZNRecord record = null;
+        try
+        {
+          record = store.get(setPath, null, 0);
+        }
+        catch (ZkNoNodeException e)
+        {
+          record = new ZNRecord(setPath);
+        }
+        record.setSimpleField("setTimestamp", "" + System.currentTimeMillis());
+        store.set(setPath, record, AccessOption.PERSISTENT);
+
+        // update once
+        store.update(updatePath, new DataUpdater<ZNRecord>()
+        {
+
+          @Override
+          public ZNRecord update(ZNRecord currentData)
+          {
+            if (currentData == null)
+            {
+              currentData = new ZNRecord(updatePath);
+            }
+            currentData.setSimpleField("updateTimestamp", "" + System.currentTimeMillis());
+
+            return currentData;
+          }
+
+        }, AccessOption.PERSISTENT);
+      }
+      catch (Exception e)
+      {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+
+    }
+  }
+  
+  public MockParticipant(String clusterName, String instanceName, String zkAddr) throws Exception
+  {
+    this(clusterName, instanceName, zkAddr, null, null);
+  }
+
+  public MockParticipant(String clusterName,
+                         String instanceName,
+                         String zkAddr,
+                         MockTransition transition) throws Exception
+  {
+    this(clusterName, instanceName, zkAddr, transition, null);
+  }
+
+  public MockParticipant(String clusterName,
+                         String instanceName,
+                         String zkAddr,
+                         MockTransition transition,
+                         MockJobIntf job) throws Exception
+  {
+    _clusterName = clusterName;
+    _instanceName = instanceName;
+    _msModelFactory = new MockMSModelFactory(transition);
+
+    _manager =
+        HelixManagerFactory.getZKHelixManager(_clusterName,
+                                              _instanceName,
+                                              InstanceType.PARTICIPANT,
+                                              zkAddr);
+    _job = job;
+  }
+
+  public MockParticipant(StateModelFactory factory,
+                         String clusterName,
+                         String instanceName,
+                         String zkAddr,
+                         MockJobIntf job) throws Exception
+  {
+    _clusterName = clusterName;
+    _instanceName = instanceName;
+    _msModelFactory = factory;
+
+    _manager =
+        HelixManagerFactory.getZKHelixManager(_clusterName,
+                                              _instanceName,
+                                              InstanceType.PARTICIPANT,
+                                              zkAddr);
+    _job = job;
+  }
+
+  public StateModelFactory getStateModelFactory()
+  {
+    return _msModelFactory;
+  }
+
+  public MockParticipant(HelixManager manager, MockTransition transition)
+  {
+    _clusterName = manager.getClusterName();
+    _instanceName = manager.getInstanceName();
+    _manager = manager;
+
+    _msModelFactory = new MockMSModelFactory(transition);
+    _job = null;
+  }
+
+  public void setTransition(MockTransition transition)
+  {
+    if (_msModelFactory instanceof MockMSModelFactory)
+    {
+      ((MockMSModelFactory) _msModelFactory).setTrasition(transition);
+    }
+  }
+
+  public HelixManager getManager()
+  {
+    return _manager;
+  }
+
+  public String getInstanceName()
+  {
+    return _instanceName;
+  }
+
+  public String getClusterName()
+  {
+    return _clusterName;
+  }
+
+  public void syncStop()
+  {
+    _stopCountDown.countDown();
+    try
+    {
+      _waitStopFinishCountDown.await();
+    }
+    catch (InterruptedException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+
+    // synchronized (_manager)
+    // {
+    // _manager.disconnect();
+    // }
+  }
+
+  public void syncStart()
+  {
+    super.start();
+    try
+    {
+      _startCountDown.await();
+    }
+    catch (InterruptedException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void run()
+  {
+    try
+    {
+      StateMachineEngine stateMach = _manager.getStateMachineEngine();
+      stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
+
+      DummyLeaderStandbyStateModelFactory lsModelFactory =
+          new DummyLeaderStandbyStateModelFactory(10);
+      DummyOnlineOfflineStateModelFactory ofModelFactory =
+          new DummyOnlineOfflineStateModelFactory(10);
+      stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
+      stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
+
+      MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
+      stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
+      // MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
+      // stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
+
+      if (_job != null)
+      {
+        _job.doPreConnectJob(_manager);
+      }
+
+      _manager.connect();
+      _startCountDown.countDown();
+
+      if (_job != null)
+      {
+        _job.doPostConnectJob(_manager);
+      }
+
+      _stopCountDown.await();
+    }
+    catch (InterruptedException e)
+    {
+      String msg =
+          "participant: " + _instanceName + ", " + Thread.currentThread().getName()
+              + " is interrupted";
+      LOG.info(msg);
+      System.err.println(msg);
+    }
+    catch (Exception e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    finally
+    {
+      _startCountDown.countDown();
+
+      synchronized (_manager)
+      {
+        _manager.disconnect();
+      }
+      _waitStopFinishCountDown.countDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/MockStorageProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/MockStorageProcess.java b/helix-core/src/test/java/org/apache/helix/mock/storage/MockStorageProcess.java
new file mode 100644
index 0000000..803192b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/MockStorageProcess.java
@@ -0,0 +1,171 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import org.apache.helix.manager.zk.ZKDataAccessor;
+import org.apache.helix.mock.consumer.ConsumerAdapter;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.log4j.*;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+
+public class MockStorageProcess
+{
+  static Logger logger = Logger.getLogger(MockStorageProcess.class);
+
+  public static final String zkServer = "zkSvr";
+  public static final String cluster = "cluster";
+  public static final String hostAddress = "host";
+  public static final String hostPort = "port";
+  public static final String relayCluster = "relayCluster";
+  public static final String help = "help";
+
+  private StorageAdapter storageAdapter;
+  private ConsumerAdapter consumerAdapter;
+
+  boolean put(Object key, Object val)
+  {
+    Integer partitionId = 1;
+    storageAdapter.isMasterForPartition(partitionId);
+    return true;
+  }
+
+  Object get(Object key)
+  {
+    Integer partitionId = 1;
+    if (storageAdapter.isMasterForPartition(partitionId)
+        || storageAdapter.isReplicaForPartition(partitionId))
+    {
+      return new String("val for " + key);
+    }
+    return null;
+  }
+
+  void start(String instanceName, String zkServerAddress, String clusterName,
+      String relayClusterName) throws Exception
+  {
+    storageAdapter = new StorageAdapter(instanceName, zkServerAddress,
+        clusterName, relayClusterName);
+    storageAdapter.start();
+  }
+
+  @SuppressWarnings("static-access")
+  private static Options constructCommandLineOptions()
+  {
+    Option helpOption = OptionBuilder.withLongOpt(help)
+        .withDescription("Prints command-line options info").create();
+
+    Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
+        .withDescription("Provide zookeeper address").create();
+    zkServerOption.setArgs(1);
+    zkServerOption.setRequired(true);
+    zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+    Option clusterOption = OptionBuilder.withLongOpt(cluster)
+        .withDescription("Provide cluster name").create();
+    clusterOption.setArgs(1);
+    clusterOption.setRequired(true);
+    clusterOption.setArgName("Cluster name (Required)");
+
+    Option hostOption = OptionBuilder.withLongOpt(hostAddress)
+        .withDescription("Provide host name").create();
+    hostOption.setArgs(1);
+    hostOption.setRequired(true);
+    hostOption.setArgName("Host name (Required)");
+
+    Option portOption = OptionBuilder.withLongOpt(hostPort)
+        .withDescription("Provide host port").create();
+    portOption.setArgs(1);
+    portOption.setRequired(true);
+    portOption.setArgName("Host port (Required)");
+
+    Option relayClusterOption = OptionBuilder.withLongOpt(relayCluster)
+        .withDescription("Provide relay cluster name").create();
+    relayClusterOption.setArgs(1);
+    relayClusterOption.setRequired(true);
+    relayClusterOption.setArgName("Relay cluster name (Required)");
+
+    Options options = new Options();
+    options.addOption(helpOption);
+    options.addOption(zkServerOption);
+    options.addOption(clusterOption);
+    options.addOption(hostOption);
+    options.addOption(portOption);
+    options.addOption(relayClusterOption);
+    return options;
+  }
+
+  public static void printUsage(Options cliOptions)
+  {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
+  }
+
+  public static CommandLine processCommandLineArgs(String[] cliArgs)
+      throws Exception
+  {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    CommandLine cmd = null;
+
+    try
+    {
+      return cliParser.parse(cliOptions, cliArgs);
+    } catch (ParseException pe)
+    {
+      System.err
+          .println("CommandLineClient: failed to parse command-line options: "
+              + pe.toString());
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+    return null;
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    String clusterName = "storage-cluster";
+    String relayClusterName = "relay-cluster";
+    String zkServerAddress = "localhost:2181";
+    String host = "localhost";
+    int port = 8900;
+    if (args.length > 0)
+    {
+      CommandLine cmd = processCommandLineArgs(args);
+      zkServerAddress = cmd.getOptionValue(zkServer);
+      clusterName = cmd.getOptionValue(cluster);
+      relayClusterName = cmd.getOptionValue(relayCluster);
+      host = cmd.getOptionValue(hostAddress);
+      String portString = cmd.getOptionValue(hostPort);
+      port = Integer.parseInt(portString);
+    }
+    // Espresso_driver.py will consume this
+    System.out.println("Mock storage started");
+    MockStorageProcess process = new MockStorageProcess();
+    process.start(host + "_" + port, zkServerAddress, clusterName,
+        relayClusterName);
+
+    Thread.sleep(10000000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/MockTransition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/MockTransition.java b/helix-core/src/test/java/org/apache/helix/mock/storage/MockTransition.java
new file mode 100644
index 0000000..ea71458
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/MockTransition.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+
+public class MockTransition
+{
+  private static Logger LOG = Logger.getLogger(MockTransition.class);
+
+  // called by state model transition functions
+  public void doTransition(Message message, NotificationContext context) throws InterruptedException
+  {
+    LOG.info("default doTransition() invoked");
+  }
+
+  // called by state model reset function
+  public void doReset()
+  {
+    LOG.info("default doReset() invoked");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/StorageAdapter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/StorageAdapter.java b/helix-core/src/test/java/org/apache/helix/mock/storage/StorageAdapter.java
new file mode 100644
index 0000000..4c190ed
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/StorageAdapter.java
@@ -0,0 +1,206 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.MessageListener;
+import org.apache.helix.mock.consumer.ConsumerAdapter;
+import org.apache.helix.mock.consumer.RelayConfig;
+import org.apache.helix.mock.consumer.RelayConsumer;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+
+class StorageAdapter
+{
+  HelixManager relayHelixManager;
+  HelixManager storageHelixManager;
+
+  HelixDataAccessor relayClusterClient;
+  HelixDataAccessor storageClusterClient;
+
+  private ExternalViewChangeListener relayViewHolder;
+  private MessageListener messageListener;
+
+  // Map<Object, RelayConsumer> relayConsumersMap;
+  private final ConsumerAdapter consumerAdapter;
+  private final StorageStateModelFactory stateModelFactory;
+
+  class partitionData
+  {
+    long initTime;
+    String permissions;
+    int generationId;
+    long hwm;
+  }
+
+  Map<String, partitionData> hostedPartitions;
+  private final String instanceName;
+
+  private static Logger logger = Logger.getLogger(StorageAdapter.class);
+
+  public StorageAdapter(String instanceName, String zkConnectString,
+      String clusterName, String relayClusterName) throws Exception
+  {
+
+    this.instanceName = instanceName;
+
+    hostedPartitions = new ConcurrentHashMap<String, partitionData>();
+
+    storageHelixManager = HelixManagerFactory
+        .getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT,
+                             zkConnectString);
+    stateModelFactory = new StorageStateModelFactory(this);
+//    StateMachineEngine genericStateMachineHandler = new StateMachineEngine();
+    StateMachineEngine stateMach = storageHelixManager.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+
+    storageHelixManager.getMessagingService()
+      .registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), stateMach);
+    storageHelixManager.connect();
+    storageClusterClient = storageHelixManager.getHelixDataAccessor();
+
+    consumerAdapter = new ConsumerAdapter(instanceName, zkConnectString,
+        relayClusterName);
+  }
+
+  // for every write call
+  public boolean isMasterForPartition(Integer partitionId)
+  {
+    StorageStateModel stateModelForParition = stateModelFactory
+        .getStateModelForPartition(partitionId);
+    return "MASTER".equals(stateModelForParition.getCurrentState());
+  }
+
+  // for every read call depending on read scale config
+  public boolean isReplicaForPartition(Integer partitionId)
+  {
+    StorageStateModel stateModelForParition = stateModelFactory
+        .getStateModelForPartition(partitionId);
+    return "REPLICA".equals(stateModelForParition.getCurrentState());
+  }
+
+  /**
+   * During replication set up which will happen when there is state transition
+   * //TODO may not be nee
+   */
+  void getMasteredPartitions()
+  {
+
+  }
+
+  /*
+   * During replication set up which will happen when there is state transition
+   *
+   * @return
+   */
+  Map<Integer, RelayConfig> getReplicatedPartitions()
+  {
+    return null;
+  }
+
+  /**
+   * Will be used in relay consumers, return can be one RelayConfig or List
+   * depending on implementation
+   */
+  List<RelayConfig> getRelaysForPartition(Integer partitionId)
+  {
+    return null;
+  }
+
+  void updateHighWaterMarkForPartition(String waterMark, Integer partitionId)
+  {
+
+  }
+
+  public void endProcess()
+  {
+
+  }
+
+  public void start()
+  {
+    logger.info("Started storage node " + instanceName);
+  }
+
+  public void setGeneration(String partition, Integer generationId)
+  {
+    partitionData pdata = hostedPartitions.get(partition);
+    pdata.generationId = generationId;
+    hostedPartitions.put(partition, pdata);
+  }
+
+  public void setHwm(String partition, long hwm)
+  {
+    partitionData pdata = hostedPartitions.get(partition);
+    pdata.hwm = hwm;
+    hostedPartitions.put(partition, pdata);
+  }
+
+  // TODO: make sure multiple invocations are possible
+  public void init(String partition)
+  {
+    logger.info("Storage initializing partition " + partition);
+    if (hostedPartitions.containsKey(partition))
+    {
+      logger.info("Partition exists, not reinitializing.");
+    } else
+    {
+      partitionData pdata = new partitionData();
+      pdata.initTime = System.currentTimeMillis();
+      pdata.permissions = "OFFLINE";
+      hostedPartitions.put(partition, pdata);
+    }
+    logger.info("Storage initialized for partition " + partition);
+  }
+
+  public void setPermissions(String partition, String permissions)
+  {
+    partitionData pdata = hostedPartitions.get(partition);
+    pdata.permissions = permissions;
+    hostedPartitions.put(partition, pdata);
+  }
+
+  public void waitForWrites(String partition)
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  public RelayConsumer getNewRelayConsumer(String dbName, String partition)
+      throws Exception
+  {
+    logger.info("Got new relayconsumer for " + partition);
+    return consumerAdapter.getNewRelayConsumer(dbName, partition);
+  }
+
+  public void removeConsumer(String partition) throws Exception
+  {
+    logger.info("Removing consumer for partition " + partition);
+    consumerAdapter.removeConsumer(partition);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModel.java
new file mode 100644
index 0000000..6b4acd3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModel.java
@@ -0,0 +1,157 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.mock.consumer.RelayConfig;
+import org.apache.helix.mock.consumer.RelayConsumer;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.log4j.Logger;
+
+
+public class StorageStateModel extends StateModel
+{
+
+  // private Map<Integer, RelayConsumer> relayConsumersMap;
+  private RelayConsumer consumer = null;
+  private RelayConfig relayConfig;
+  private final StorageAdapter storage;
+
+  private static Logger logger = Logger.getLogger(StorageStateModel.class);
+
+  public StorageStateModel(String stateUnitKey, StorageAdapter storageAdapter)
+  {
+    // relayConsumersMap = new HashMap<Integer,RelayConsumer>();
+    storage = storageAdapter;
+    // this.consumerAdapter = consumerAdapter;
+  }
+
+  public RelayConfig getRelayConfig()
+  {
+    return relayConfig;
+  }
+
+  public void setRelayConfig(RelayConfig relayConfig)
+  {
+    this.relayConfig = relayConfig;
+  }
+
+  void checkDebug(Message task) throws Exception
+  {
+    // For debugging purposes
+    if (task.getDebug() == true)
+    {
+      throw new Exception("Exception for debug");
+    }
+  }
+
+  // @transition(to='to',from='from',blah blah..)
+  public void onBecomeSlaveFromOffline(Message task, NotificationContext context)
+      throws Exception
+  {
+
+    logger.info("Becoming slave from offline");
+
+    checkDebug(task);
+
+    String partition = task.getPartitionName();
+    String[] pdata = partition.split("\\.");
+    String dbName = pdata[0];
+
+    // Initializations for the storage node to create right tables, indexes
+    // etc.
+    storage.init(partition);
+    storage.setPermissions(partition, "READONLY");
+
+    // start consuming from the relay
+    consumer = storage.getNewRelayConsumer(dbName, partition);
+    consumer.start();
+    // TODO: how do we know we are caught up?
+
+    logger.info("Became slave for partition " + partition);
+  }
+
+  // @transition(to='to',from='from',blah blah..)
+  public void onBecomeSlaveFromMaster(Message task, NotificationContext context)
+      throws Exception
+  {
+
+    logger.info("Becoming slave from master");
+
+    checkDebug(task);
+
+    String partition = task.getPartitionName();
+    String[] pdata = partition.split("\\.");
+    String dbName = pdata[0];
+    storage.setPermissions(partition, "READONLY");
+    storage.waitForWrites(partition);
+
+    // start consuming from the relay
+    consumer = storage.getNewRelayConsumer(dbName, partition);
+    consumer.start();
+
+    logger.info("Becamse slave for partition " + partition);
+  }
+
+  // @transition(to='to',from='from',blah blah..)
+  public void onBecomeMasterFromSlave(Message task, NotificationContext context)
+      throws Exception
+  {
+    logger.info("Becoming master from slave");
+
+    checkDebug(task);
+
+    String partition = task.getPartitionName();
+
+    // stop consumer and refetch from all so all changes are drained
+    consumer.flush(); // blocking call
+
+    // TODO: publish the hwm somewhere
+    long hwm = consumer.getHwm();
+    storage.setHwm(partition, hwm);
+    storage.removeConsumer(partition);
+    consumer = null;
+
+    // set generation in storage
+    Integer generationId = task.getGeneration();
+    storage.setGeneration(partition, generationId);
+
+    storage.setPermissions(partition, "READWRITE");
+
+    logger.info("Became master for partition " + partition);
+  }
+
+  // @transition(to='to',from='from',blah blah..)
+  public void onBecomeOfflineFromSlave(Message task, NotificationContext context)
+      throws Exception
+  {
+
+    logger.info("Becoming offline from slave");
+
+    checkDebug(task);
+
+    String partition = task.getPartitionName();
+
+    consumer.stop();
+    storage.removeConsumer(partition);
+    consumer = null;
+
+    storage.setPermissions(partition, "OFFLINE");
+
+    logger.info("Became offline for partition " + partition);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModelFactory.java
new file mode 100644
index 0000000..132419c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/mock/storage/StorageStateModelFactory.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.mock.storage;
+
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.log4j.Logger;
+
+
+public class StorageStateModelFactory extends StateModelFactory
+{
+  private static Logger logger = Logger
+      .getLogger(StorageStateModelFactory.class);
+
+  private StorageAdapter storageAdapter;
+
+  // private ConsumerAdapter consumerAdapter;
+
+  public StorageStateModelFactory(StorageAdapter storage)
+  {
+    storageAdapter = storage;
+  }
+
+  StorageStateModel getStateModelForPartition(Integer partition)
+  {
+    return null;
+  }
+
+  @Override
+  public StateModel createNewStateModel(String stateUnitKey)
+  {
+    logger.info("StorageStateModelFactory.getStateModel()");
+    return new StorageStateModel(stateUnitKey, storageAdapter);
+  }
+
+}