You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC

[24/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
new file mode 100644
index 0000000..62313a4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
@@ -0,0 +1,1029 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.tools.TestCommand.CommandType;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * a test is structured logically as a list of commands a command has three parts: COMMAND
+ * | TRIGGER | ARG'S COMMAND could be: modify, verify, start, stop
+ * 
+ * TRIGGER is optional and consists of start-time, timeout, and expect-value which means
+ * the COMMAND is triggered between [start-time, start-time + timeout] and is triggered
+ * when the value in concern equals to expect-value
+ * 
+ * ARG's format depends on COMMAND if COMMAND is modify/verify, arg is in form of:
+ * <znode-path, property-type (SIMPLE, LIST, or MAP), operation(+, -, ==, !=), key,
+ * update-value> in which key is k1 for SIMPLE, k1|index for LIST, and k1|k2 for MAP field
+ * if COMMAND is start/stop, arg is a thread handler
+ * 
+ * @author zzhang
+ * 
+ */
+
+public class TestExecutor
+{
+  /**
+   * SIMPLE: simple field change LIST: list field change MAP: map field change ZNODE:
+   * entire znode change
+   */
+  public enum ZnodePropertyType
+  {
+    SIMPLE, LIST, MAP, ZNODE
+  }
+
+  private enum ZnodeModValueType
+  {
+    INVALID, SINGLE_VALUE, LIST_VALUE, MAP_VALUE, ZNODE_VALUE
+  }
+
+  private static Logger                                 logger              =
+                                                                                Logger.getLogger(TestExecutor.class);
+  private static final long                             SLEEP_TIME          = 500;                                                   // in
+                                                                                                                                      // ms
+
+  private final static PropertyJsonComparator<String>   STRING_COMPARATOR   =
+                                                                                new PropertyJsonComparator<String>(String.class);
+  private final static PropertyJsonSerializer<ZNRecord> ZNRECORD_SERIALIZER =
+                                                                                new PropertyJsonSerializer<ZNRecord>(ZNRecord.class);
+
+  private static ZnodeModValueType getValueType(ZnodePropertyType type, String key)
+  {
+    ZnodeModValueType valueType = ZnodeModValueType.INVALID;
+    switch (type)
+    {
+    case SIMPLE:
+      if (key == null)
+      {
+        logger.warn("invalid key for simple field: key is null");
+      }
+      else
+      {
+        String keyParts[] = key.split("/");
+        if (keyParts.length != 1)
+        {
+          logger.warn("invalid key for simple field: " + key
+              + ", expect 1 part: key1 (no slash)");
+        }
+        else
+        {
+          valueType = ZnodeModValueType.SINGLE_VALUE;
+        }
+      }
+      break;
+    case LIST:
+      if (key == null)
+      {
+        logger.warn("invalid key for simple field: key is null");
+      }
+      else
+      {
+        String keyParts[] = key.split("/");
+        if (keyParts.length < 1 || keyParts.length > 2)
+        {
+          logger.warn("invalid key for list field: " + key
+              + ", expect 1 or 2 parts: key1 or key1/index)");
+        }
+        else if (keyParts.length == 1)
+        {
+          valueType = ZnodeModValueType.LIST_VALUE;
+        }
+        else
+        {
+          try
+          {
+            int index = Integer.parseInt(keyParts[1]);
+            if (index < 0)
+            {
+              logger.warn("invalid key for list field: " + key + ", index < 0");
+            }
+            else
+            {
+              valueType = ZnodeModValueType.SINGLE_VALUE;
+            }
+          }
+          catch (NumberFormatException e)
+          {
+            logger.warn("invalid key for list field: " + key
+                + ", part-2 is NOT an integer");
+          }
+        }
+      }
+      break;
+    case MAP:
+      if (key == null)
+      {
+        logger.warn("invalid key for simple field: key is null");
+      }
+      else
+      {
+        String keyParts[] = key.split("/");
+        if (keyParts.length < 1 || keyParts.length > 2)
+        {
+          logger.warn("invalid key for map field: " + key
+              + ", expect 1 or 2 parts: key1 or key1/key2)");
+        }
+        else if (keyParts.length == 1)
+        {
+          valueType = ZnodeModValueType.MAP_VALUE;
+        }
+        else
+        {
+          valueType = ZnodeModValueType.SINGLE_VALUE;
+        }
+      }
+      break;
+    case ZNODE:
+      valueType = ZnodeModValueType.ZNODE_VALUE;
+    default:
+      break;
+    }
+    return valueType;
+  }
+
+  private static String getSingleValue(ZNRecord record, ZnodePropertyType type, String key)
+  {
+    if (record == null || key == null)
+    {
+      return null;
+    }
+
+    String value = null;
+    String keyParts[] = key.split("/");
+
+    switch (type)
+    {
+    case SIMPLE:
+      value = record.getSimpleField(key);
+      break;
+    case LIST:
+      List<String> list = record.getListField(keyParts[0]);
+      if (list == null)
+      {
+        logger.warn("invalid key for list field: " + key
+            + ", map for key part-1 doesn't exist");
+        return null;
+      }
+      int idx = Integer.parseInt(keyParts[1]);
+      value = list.get(idx);
+      break;
+    case MAP:
+      Map<String, String> map = record.getMapField(keyParts[0]);
+      if (map == null)
+      {
+        logger.warn("invalid key for map field: " + key
+            + ", map for key part-1 doesn't exist");
+        return null;
+      }
+      value = map.get(keyParts[1]);
+      break;
+    default:
+      break;
+    }
+
+    return value;
+  }
+
+  private static List<String> getListValue(ZNRecord record, String key)
+  {
+    if (record == null)
+    {
+      return null;
+    }
+    return record.getListField(key);
+  }
+
+  private static Map<String, String> getMapValue(ZNRecord record, String key)
+  {
+    return record.getMapField(key);
+  }
+
+  // comparator's for single/list/map-value
+  private static boolean compareSingleValue(String actual,
+                                            String expect,
+                                            String key,
+                                            ZNRecord diff)
+  {
+    boolean ret = (STRING_COMPARATOR.compare(actual, expect) == 0);
+
+    if (diff != null)
+    {
+      diff.setSimpleField(key + "/expect", expect);
+      diff.setSimpleField(key + "/actual", actual);
+    }
+    return ret;
+  }
+
+  private static boolean compareListValue(List<String> actualList,
+                                          List<String> expectList,
+                                          String key,
+                                          ZNRecord diff)
+  {
+    boolean ret = true;
+    if (actualList == null && expectList == null)
+    {
+      ret = true;
+    }
+    else if (actualList == null && expectList != null)
+    {
+      ret = false;
+      if (diff != null)
+      {
+        diff.setListField(key + "/expect", expectList);
+      }
+    }
+    else if (actualList != null && expectList == null)
+    {
+      ret = false;
+      if (diff != null)
+      {
+        diff.setListField(key + "/actual", actualList);
+      }
+    }
+    else
+    {
+      Iterator<String> itrActual = actualList.iterator();
+      Iterator<String> itrExpect = expectList.iterator();
+      if (diff != null && diff.getListField(key + "/expect") == null)
+      {
+        diff.setListField(key + "/expect", new ArrayList<String>());
+      }
+
+      if (diff != null && diff.getListField(key + "/actual") == null)
+      {
+        diff.setListField(key + "/actual", new ArrayList<String>());
+      }
+
+      while (itrActual.hasNext() && itrExpect.hasNext())
+      {
+        String actual = itrActual.next();
+        String expect = itrExpect.next();
+
+        if (STRING_COMPARATOR.compare(actual, expect) != 0)
+        {
+          ret = false;
+          if (diff != null)
+          {
+            diff.getListField(key + "/expect").add(expect);
+            diff.getListField(key + "/actual").add(actual);
+          }
+        }
+      }
+
+      while (itrActual.hasNext())
+      {
+        String actual = itrActual.next();
+        if (diff != null)
+        {
+          diff.getListField(key + "/actual").add(actual);
+        }
+      }
+
+      while (itrExpect.hasNext())
+      {
+        String expect = itrExpect.next();
+        if (diff != null)
+        {
+          diff.getListField(key + "/expect").add(expect);
+        }
+      }
+    }
+    return ret;
+  }
+
+  private static void setMapField(ZNRecord record, String key1, String key2, String value)
+  {
+    if (record.getMapField(key1) == null)
+    {
+      record.setMapField(key1, new TreeMap<String, String>());
+    }
+    record.getMapField(key1).put(key2, value);
+  }
+
+  private static boolean compareMapValue(Map<String, String> actualMap,
+                                         Map<String, String> expectMap,
+                                         String mapKey,
+                                         ZNRecord diff)
+  {
+    boolean ret = true;
+    if (actualMap == null && expectMap == null)
+    {
+      ret = true;
+    }
+    else if (actualMap == null && expectMap != null)
+    {
+      ret = false;
+      if (diff != null)
+      {
+        diff.setMapField(mapKey + "/expect", expectMap);
+      }
+    }
+    else if (actualMap != null && expectMap == null)
+    {
+      ret = false;
+      if (diff != null)
+      {
+        diff.setMapField(mapKey + "/actual", actualMap);
+      }
+
+    }
+    else
+    {
+      for (String key : actualMap.keySet())
+      {
+        String actual = actualMap.get(key);
+        if (!expectMap.containsKey(key))
+        {
+          ret = false;
+
+          if (diff != null)
+          {
+            setMapField(diff, mapKey + "/actual", key, actual);
+          }
+        }
+        else
+        {
+          String expect = expectMap.get(key);
+          if (STRING_COMPARATOR.compare(actual, expect) != 0)
+          {
+            ret = false;
+            if (diff != null)
+            {
+              setMapField(diff, mapKey + "/actual", key, actual);
+              setMapField(diff, mapKey + "/expect", key, expect);
+            }
+          }
+        }
+      }
+
+      for (String key : expectMap.keySet())
+      {
+        String expect = expectMap.get(key);
+        if (!actualMap.containsKey(key))
+        {
+          ret = false;
+
+          if (diff != null)
+          {
+            setMapField(diff, mapKey + "/expect", key, expect);
+          }
+        }
+        else
+        {
+          String actual = actualMap.get(key);
+          if (STRING_COMPARATOR.compare(actual, expect) != 0)
+          {
+            ret = false;
+            if (diff != null)
+            {
+              setMapField(diff, mapKey + "/actual", key, actual);
+              setMapField(diff, mapKey + "/expect", key, expect);
+            }
+          }
+        }
+      }
+    }
+    return ret;
+  }
+
+  private static void setZNRecord(ZNRecord diff, ZNRecord record, String keySuffix)
+  {
+    if (diff == null || record == null)
+    {
+      return;
+    }
+
+    for (String key : record.getSimpleFields().keySet())
+    {
+      diff.setSimpleField(key + "/" + keySuffix, record.getSimpleField(key));
+    }
+
+    for (String key : record.getListFields().keySet())
+    {
+      diff.setListField(key + "/" + keySuffix, record.getListField(key));
+    }
+
+    for (String key : record.getMapFields().keySet())
+    {
+      diff.setMapField(key + "/" + keySuffix, record.getMapField(key));
+    }
+  }
+
+  private static boolean compareZnodeValue(ZNRecord actual, ZNRecord expect, ZNRecord diff)
+  {
+    boolean ret = true;
+    if (actual == null && expect == null)
+    {
+      ret = true;
+    }
+    else if (actual == null && expect != null)
+    {
+      ret = false;
+      if (diff != null)
+      {
+        setZNRecord(diff, expect, "expect");
+      }
+    }
+    else if (actual != null && expect == null)
+    {
+      ret = false;
+      if (diff != null)
+      {
+        setZNRecord(diff, actual, "actual");
+      }
+    }
+    else
+    {
+      for (String key : actual.getSimpleFields().keySet())
+      {
+        if (compareSingleValue(actual.getSimpleField(key),
+                               expect.getSimpleField(key),
+                               key,
+                               diff) == false)
+        {
+          ret = false;
+        }
+      }
+
+      for (String key : expect.getMapFields().keySet())
+      {
+        if (!actual.getMapFields().containsKey(key))
+        {
+          if (diff != null)
+          {
+            ret = false;
+            diff.setMapField(key + "/expect", expect.getMapField(key));
+          }
+        }
+        else
+        {
+          if (compareMapValue(actual.getMapField(key), expect.getMapField(key), key, diff) == false)
+          {
+            ret = false;
+          }
+        }
+      }
+
+      for (String key : actual.getMapFields().keySet())
+      {
+        if (!expect.getMapFields().containsKey(key))
+        {
+          if (diff != null)
+          {
+            ret = false;
+            diff.setMapField(key + "/actual", actual.getMapField(key));
+          }
+        }
+        else
+        {
+          if (compareMapValue(actual.getMapField(key), expect.getMapField(key), key, diff) == false)
+          {
+            ret = false;
+          }
+        }
+      }
+    }
+    return ret;
+  }
+
+  private static void resetZNRecord(ZNRecord record)
+  {
+    if (record != null)
+    {
+      record.getSimpleFields().clear();
+      record.getListFields().clear();
+      record.getMapFields().clear();
+    }
+  }
+
+  private static boolean isValueExpected(ZNRecord current,
+                                         ZnodePropertyType type,
+                                         String key,
+                                         ZnodeValue expect,
+                                         ZNRecord diff)
+  {
+    // expect value = null means not expect any value
+    if (expect == null)
+    {
+      return true;
+    }
+
+    boolean result = false;
+    resetZNRecord(diff);
+    ZnodeModValueType valueType = getValueType(type, key);
+    switch (valueType)
+    {
+    case SINGLE_VALUE:
+      String singleValue = getSingleValue(current, type, key);
+      result = compareSingleValue(singleValue, expect._singleValue, key, diff);
+      break;
+    case LIST_VALUE:
+      List<String> listValue = getListValue(current, key);
+      result = compareListValue(listValue, expect._listValue, key, diff);
+      break;
+    case MAP_VALUE:
+      Map<String, String> mapValue = getMapValue(current, key);
+      result = compareMapValue(mapValue, expect._mapValue, key, diff);
+      break;
+    case ZNODE_VALUE:
+      result = compareZnodeValue(current, expect._znodeValue, diff);
+      break;
+    case INVALID:
+      break;
+    default:
+      break;
+    }
+    return result;
+  }
+
+  private static void setSingleValue(ZNRecord record,
+                                     ZnodePropertyType type,
+                                     String key,
+                                     String value)
+  {
+    String keyParts[] = key.split("/");
+
+    switch (type)
+    {
+    case SIMPLE:
+      record.setSimpleField(key, value);
+      break;
+    case LIST:
+      List<String> list = record.getListField(keyParts[0]);
+      if (list == null)
+      {
+        logger.warn("invalid key for list field: " + key
+            + ", value for key part-1 doesn't exist");
+        return;
+      }
+      int idx = Integer.parseInt(keyParts[1]);
+      list.remove(idx);
+      list.add(idx, value);
+      break;
+    case MAP:
+      Map<String, String> map = record.getMapField(keyParts[0]);
+      if (map == null)
+      {
+        logger.warn("invalid key for map field: " + key
+            + ", value for key part-1 doesn't exist");
+        return;
+      }
+      map.put(keyParts[1], value);
+      break;
+    default:
+      break;
+    }
+  }
+
+  private static void setListValue(ZNRecord record, String key, List<String> value)
+  {
+    record.setListField(key, value);
+  }
+
+  private static void setMapValue(ZNRecord record, String key, Map<String, String> value)
+  {
+    record.setMapField(key, value);
+  }
+
+  private static void removeSingleValue(ZNRecord record,
+                                        ZnodePropertyType type,
+                                        String key)
+  {
+    if (record == null)
+    {
+      return;
+    }
+
+    String keyParts[] = key.split("/");
+    switch (type)
+    {
+    case SIMPLE:
+      record.getSimpleFields().remove(key);
+      break;
+    case LIST:
+      List<String> list = record.getListField(keyParts[0]);
+      if (list == null)
+      {
+        logger.warn("invalid key for list field: " + key
+            + ", value for key part-1 doesn't exist");
+        return;
+      }
+      int idx = Integer.parseInt(keyParts[1]);
+      list.remove(idx);
+      break;
+    case MAP:
+      Map<String, String> map = record.getMapField(keyParts[0]);
+      if (map == null)
+      {
+        logger.warn("invalid key for map field: " + key
+            + ", value for key part-1 doesn't exist");
+        return;
+      }
+      map.remove(keyParts[1]);
+      break;
+    default:
+      break;
+    }
+  }
+
+  private static void removeListValue(ZNRecord record, String key)
+  {
+    if (record == null || record.getListFields() == null)
+    {
+      record.getListFields().remove(key);
+    }
+  }
+
+  private static void removeMapValue(ZNRecord record, String key)
+  {
+    record.getMapFields().remove(key);
+  }
+
+  private static boolean executeVerifier(ZNRecord actual,
+                                         TestCommand command,
+                                         ZNRecord diff)
+  {
+    final ZnodeOpArg arg = command._znodeOpArg;
+    final ZnodeValue expectValue = command._trigger._expectValue;
+
+    boolean result =
+        isValueExpected(actual, arg._propertyType, arg._key, expectValue, diff);
+    String operation = arg._operation;
+    if (operation.equals("!="))
+    {
+      result = !result;
+    }
+    else if (!operation.equals("=="))
+    {
+      logger.warn("fail to execute (unsupport operation=" + operation + "):" + operation);
+      result = false;
+    }
+
+    return result;
+  }
+
+  private static boolean compareAndSetZnode(ZnodeValue expect,
+                                            ZnodeOpArg arg,
+                                            ZkClient zkClient,
+                                            ZNRecord diff)
+  {
+    String path = arg._znodePath;
+    ZnodePropertyType type = arg._propertyType;
+    String key = arg._key;
+    boolean success = true;
+
+    // retry 3 times in case there are write conflicts
+    long backoffTime = 20; // ms
+    for (int i = 0; i < 3; i++)
+    {
+      try
+      {
+        Stat stat = new Stat();
+        ZNRecord record = zkClient.<ZNRecord> readDataAndStat(path, stat, true);
+
+        if (isValueExpected(record, type, key, expect, diff))
+        {
+          if (arg._operation.compareTo("+") == 0)
+          {
+            if (record == null)
+            {
+              record = new ZNRecord("default");
+            }
+            ZnodeModValueType valueType = getValueType(arg._propertyType, arg._key);
+            switch (valueType)
+            {
+            case SINGLE_VALUE:
+              setSingleValue(record,
+                             arg._propertyType,
+                             arg._key,
+                             arg._updateValue._singleValue);
+              break;
+            case LIST_VALUE:
+              setListValue(record, arg._key, arg._updateValue._listValue);
+              break;
+            case MAP_VALUE:
+              setMapValue(record, arg._key, arg._updateValue._mapValue);
+              break;
+            case ZNODE_VALUE:
+              // deep copy
+              record =
+                  ZNRECORD_SERIALIZER.deserialize(ZNRECORD_SERIALIZER.serialize(arg._updateValue._znodeValue));
+              break;
+            case INVALID:
+              break;
+            default:
+              break;
+            }
+          }
+          else if (arg._operation.compareTo("-") == 0)
+          {
+            ZnodeModValueType valueType = getValueType(arg._propertyType, arg._key);
+            switch (valueType)
+            {
+            case SINGLE_VALUE:
+              removeSingleValue(record, arg._propertyType, arg._key);
+              break;
+            case LIST_VALUE:
+              removeListValue(record, arg._key);
+              break;
+            case MAP_VALUE:
+              removeMapValue(record, arg._key);
+              break;
+            case ZNODE_VALUE:
+              record = null;
+              break;
+            case INVALID:
+              break;
+            default:
+              break;
+            }
+          }
+          else
+          {
+            logger.warn("fail to execute (unsupport operation): " + arg._operation);
+            success = false;
+          }
+
+          if (success == true)
+          {
+            if (record == null)
+            {
+              zkClient.delete(path);
+            }
+            else
+            {
+              try
+              {
+                zkClient.createPersistent(path, true);
+              }
+              catch (ZkNodeExistsException e)
+              {
+                // OK
+              }
+              zkClient.writeData(path, record, stat.getVersion());
+            }
+            return true;
+          }
+          else
+          {
+            return false;
+          }
+        }
+      }
+      catch (ZkBadVersionException e)
+      {
+        // e.printStackTrace();
+      }
+      catch (PropertyStoreException e)
+      {
+        // e.printStackTrace();
+      }
+
+      try
+      {
+        Thread.sleep(backoffTime);
+      }
+      catch (InterruptedException e)
+      {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+      backoffTime *= 2;
+    }
+
+    return false;
+  }
+
+  private static class ExecuteCommand implements Runnable
+  {
+    private final TestCommand               _command;
+    private final long                      _startTime;
+    private final ZkClient                  _zkClient;
+    private final CountDownLatch            _countDown;
+    private final Map<TestCommand, Boolean> _testResults;
+
+    public ExecuteCommand(long startTime,
+                          TestCommand command,
+                          CountDownLatch countDown,
+                          ZkClient zkClient,
+                          Map<TestCommand, Boolean> testResults)
+    {
+      _startTime = startTime;
+      _command = command;
+      _countDown = countDown;
+      _zkClient = zkClient;
+      _testResults = testResults;
+    }
+
+    @Override
+    public void run()
+    {
+      boolean result = false;
+      long now = System.currentTimeMillis();
+      final long timeout = now + _command._trigger._timeout;
+      ZNRecord diff = new ZNRecord("diff");
+      try
+      {
+        if (now < _startTime)
+        {
+          Thread.sleep(_startTime - now);
+        }
+
+        do
+        {
+          if (_command._commandType == CommandType.MODIFY)
+          {
+            ZnodeOpArg arg = _command._znodeOpArg;
+            final ZnodeValue expectValue = _command._trigger._expectValue;
+            result = compareAndSetZnode(expectValue, arg, _zkClient, diff);
+            // logger.error("result:" + result + ", " + _command);
+
+            if (result == true)
+            {
+              _command._finishTimestamp = System.currentTimeMillis();
+              _testResults.put(_command, true);
+
+              break;
+            }
+            else
+            {
+              // logger.error("result:" + result + ", diff:" + diff);
+            }
+          }
+          else if (_command._commandType == CommandType.VERIFY)
+          {
+            ZnodeOpArg arg = _command._znodeOpArg;
+            final String znodePath = arg._znodePath;
+            ZNRecord record = _zkClient.<ZNRecord> readData(znodePath, true);
+
+            result = executeVerifier(record, _command, diff);
+            // logger.error("result:" + result + ", " + _command.toString());
+            if (result == true)
+            {
+              _command._finishTimestamp = System.currentTimeMillis();
+              _testResults.put(_command, true);
+              break;
+            }
+            else
+            {
+              // logger.error("result:" + result + ", diff:" + diff);
+            }
+          }
+          else if (_command._commandType == CommandType.START)
+          {
+            // TODO add data trigger for START command
+            Thread thread = _command._nodeOpArg._thread;
+            thread.start();
+
+            result = true;
+            _command._finishTimestamp = System.currentTimeMillis();
+            logger.info("result:" + result + ", " + _command.toString());
+            _testResults.put(_command, true);
+            break;
+          }
+          else if (_command._commandType == CommandType.STOP)
+          {
+            // TODO add data trigger for STOP command
+            HelixManager manager = _command._nodeOpArg._manager;
+            manager.disconnect();
+            Thread thread = _command._nodeOpArg._thread;
+            thread.interrupt();
+
+            // System.err.println("stop " +
+            // _command._nodeOpArg._manager.getInstanceName());
+            result = true;
+            _command._finishTimestamp = System.currentTimeMillis();
+            logger.info("result:" + result + ", " + _command.toString());
+            _testResults.put(_command, true);
+            break;
+          }
+          else
+          {
+            throw new IllegalArgumentException("Unsupport command type (was "
+                + _command._commandType + ")");
+          }
+
+          Thread.sleep(SLEEP_TIME);
+
+          now = System.currentTimeMillis();
+        }
+        while (now <= timeout);
+      }
+      catch (Exception e)
+      {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+      finally
+      {
+        if (result == false)
+        {
+          _command._finishTimestamp = System.currentTimeMillis();
+          logger.error("result:" + result + ", diff: " + diff);
+        }
+        _countDown.countDown();
+        if (_countDown.getCount() == 0)
+        {
+          if (_zkClient != null && _zkClient.getConnection() != null)
+
+          {
+            _zkClient.close();
+          }
+        }
+      }
+    }
+  }
+
+  private static Map<TestCommand, Boolean> executeTestHelper(List<TestCommand> commandList,
+                                                             String zkAddr,
+                                                             CountDownLatch countDown)
+  {
+
+    final Map<TestCommand, Boolean> testResults =
+        new ConcurrentHashMap<TestCommand, Boolean>();
+    ZkClient zkClient = null;
+
+    zkClient = new ZkClient(zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+
+    // sort on trigger's start time, stable sort
+    Collections.sort(commandList, new Comparator<TestCommand>()
+    {
+      @Override
+      public int compare(TestCommand o1, TestCommand o2)
+      {
+        return (int) (o1._trigger._startTime - o2._trigger._startTime);
+      }
+    });
+
+    for (TestCommand command : commandList)
+    {
+      testResults.put(command, new Boolean(false));
+
+      TestTrigger trigger = command._trigger;
+      command._startTimestamp = System.currentTimeMillis() + trigger._startTime;
+      new Thread(new ExecuteCommand(command._startTimestamp,
+                                    command,
+                                    countDown,
+                                    zkClient,
+                                    testResults)).start();
+    }
+
+    return testResults;
+  }
+
+  public static void executeTestAsync(List<TestCommand> commandList, String zkAddr) throws InterruptedException
+  {
+    CountDownLatch countDown = new CountDownLatch(commandList.size());
+    executeTestHelper(commandList, zkAddr, countDown);
+  }
+
+  public static Map<TestCommand, Boolean> executeTest(List<TestCommand> commandList,
+                                                      String zkAddr) throws InterruptedException
+  {
+    final CountDownLatch countDown = new CountDownLatch(commandList.size());
+    Map<TestCommand, Boolean> testResults =
+        executeTestHelper(commandList, zkAddr, countDown);
+
+    // TODO add timeout
+    countDown.await();
+
+    return testResults;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/TestTrigger.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TestTrigger.java b/helix-core/src/main/java/org/apache/helix/tools/TestTrigger.java
new file mode 100644
index 0000000..31c44ab
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/TestTrigger.java
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+
+
+
+public class TestTrigger
+{
+  public long _startTime;
+  public long _timeout;
+  public ZnodeValue _expectValue;
+  
+  /**
+   * no time or data trigger
+   */
+  public TestTrigger()
+  {
+    this(0, 0, (ZnodeValue)null);
+  }
+
+  /**
+   * time trigger with a start time, no data trigger
+   * @param startTime
+   * @param timeout
+   */
+  public TestTrigger(long startTime)
+  {
+    this(startTime, 0, (ZnodeValue)null);
+  }
+
+  /**
+   * simple field data trigger
+   * @param expect
+   */
+  public TestTrigger(long startTime, long timeout, String expect)
+  {
+    this(startTime, timeout, new ZnodeValue(expect));
+  }
+
+  /**
+   * list field data trigger
+   * @param expect
+   */
+  public TestTrigger(long startTime, long timeout, List<String> expect)
+  {
+    this(startTime, timeout, new ZnodeValue(expect));
+  }
+
+  /**
+   * map field data trigger
+   * @param expect
+   */
+  public TestTrigger(long startTime, long timeout, Map<String, String> expect)
+  {
+    this(startTime, timeout, new ZnodeValue(expect));
+  }
+  
+  /**
+   * znode data trigger
+   * @param expect
+   */
+  public TestTrigger(long startTime, long timeout, ZNRecord expect)
+  {
+    this(startTime, timeout, new ZnodeValue(expect));
+  }
+  
+  /**
+   * 
+   * @param startTime
+   * @param timeout
+   * @param expect
+   */
+  public TestTrigger(long startTime, long timeout, ZnodeValue expect)
+  {
+    _startTime = startTime;
+    _timeout = timeout;
+    _expectValue = expect;
+  }
+ 
+  @Override
+  public String toString()
+  {
+    String ret = "<" + _startTime + "~" + _timeout + "ms, " + _expectValue + ">";
+    return ret;
+  }
+
+  // TODO temp test; remove it
+  /*
+  public static void main(String[] args) 
+  {
+    TestTrigger trigger = new TestTrigger(0, 0, "simpleValue0");
+    System.out.println("trigger=" + trigger);
+    
+    List<String> list = new ArrayList<String>();
+    list.add("listValue1");
+    list.add("listValue2");
+    trigger = new TestTrigger(0, 0, list);
+    System.out.println("trigger=" + trigger);
+    
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("mapKey3", "mapValue3");
+    map.put("mapKey4", "mapValue4");
+    trigger = new TestTrigger(0, 0, map);
+    System.out.println("trigger=" + trigger);
+    
+    trigger = new TestTrigger();
+    System.out.println("trigger=" + trigger);
+  }
+  */
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/YAISCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/YAISCalculator.java b/helix-core/src/main/java/org/apache/helix/tools/YAISCalculator.java
new file mode 100644
index 0000000..30b25b6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/YAISCalculator.java
@@ -0,0 +1,202 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+public class YAISCalculator
+{
+  static class Assignment
+  {
+    private final int numNodes;
+    private final int replication;
+    Partition[] partitions;
+    Node[] nodes;
+
+    public Assignment(int numNodes, int numPartitions, int replication)
+    {
+      this.numNodes = numNodes;
+      this.replication = replication;
+      partitions = new Partition[numPartitions];
+      for (int i = 0; i < numPartitions; i++)
+      {
+        partitions[i] = new Partition(i, replication);
+      }
+      nodes = new Node[numNodes];
+      for (int i = 0; i < numNodes; i++)
+      {
+        nodes[i] = new Node(replication);
+      }
+    }
+
+    public void assign(int partitionId, int replicaId, int nodeId)
+    {
+      System.out.println("Assigning (" + partitionId + "," + replicaId
+          + ") to " + nodeId);
+      partitions[partitionId].nodeIds[replicaId] = nodeId;
+      nodes[nodeId].partitionLists.get(replicaId).push(partitionId);
+    }
+
+    public void unassign(int partitionId, int replicaId)
+    {
+
+    }
+
+    Integer[] getPartitionsPerNode(int nodeId, int replicaId)
+    {
+      List<Integer> partitionsList = new ArrayList<Integer>();
+      for (Partition p : partitions)
+      {
+        if (p.nodeIds[replicaId] == nodeId)
+        {
+          partitionsList.add(p.partionId);
+        }
+      }
+      Integer[] array = new Integer[partitionsList.size()];
+      partitionsList.toArray(array);
+      return array;
+    }
+
+    public void printPerNode()
+    {
+      for (int nodeId = 0; nodeId < numNodes; nodeId++)
+      {
+        for (int r = 0; r < replication; r++)
+        {
+          StringBuilder sb = new StringBuilder();
+          sb.append("(").append(nodeId).append(",").append(r).append("):\t");
+          Node node = nodes[nodeId];
+          LinkedList<Integer> linkedList = node.partitionLists.get(r);
+          for (int partitionId : linkedList)
+          {
+            sb.append(partitionId).append(",");
+          }
+          System.out.println(sb.toString());
+        }
+
+      }
+    }
+  }
+
+  static class Partition
+  {
+
+    final int partionId;
+
+    public Partition(int partionId, int replication)
+    {
+      this.partionId = partionId;
+      nodeIds = new int[replication];
+      Arrays.fill(nodeIds, -1);
+    }
+
+    int nodeIds[];
+  }
+
+  static class Node
+  {
+    private final int replication;
+    ArrayList<LinkedList<Integer>> partitionLists;
+
+    public Node(int replication)
+    {
+      this.replication = replication;
+      partitionLists = new ArrayList<LinkedList<Integer>>(replication);
+      for (int i = 0; i < replication; i++)
+      {
+        partitionLists.add(new LinkedList<Integer>());
+      }
+    }
+
+  }
+
+  public static void main(String[] args)
+  {
+    doAssignment(new int[]
+    { 5 }, 120, 3);
+  }
+
+  private static void doAssignment(int[] nodes, int partitions, int replication)
+  {
+    int N = nodes[0];
+    int totalNodes = 0;
+    for (int temp : nodes)
+    {
+      totalNodes += temp;
+    }
+    Assignment assignment = new Assignment(totalNodes, partitions, replication);
+    int nodeId = 0;
+    for (int i = 0; i < partitions; i++)
+    {
+      assignment.assign(i, 0, nodeId);
+      nodeId = (nodeId + 1) % N;
+    }
+    Random random = new Random();
+    for (int r = 1; r < replication; r++)
+    {
+      for (int id = 0; id < N; id++)
+      {
+        Integer[] partitionsPerNode = assignment.getPartitionsPerNode(id, 0);
+        boolean[] used = new boolean[partitionsPerNode.length];
+        Arrays.fill(used, false);
+        System.out.println(id + "-" + partitionsPerNode.length);
+        nodeId = (id + r) % N;
+        int count = partitionsPerNode.length;
+        boolean done = false;
+        do
+        {
+          if (nodeId != id)
+          {
+            int nextInt = random.nextInt(count);
+            int temp = 0;
+            for (int b = 0; b < used.length; b++)
+            {
+              if (!used[b] && temp == nextInt)
+              {
+                assignment.assign(partitionsPerNode[b], r, nodeId);
+                used[b] = true;
+                break;
+              }
+            }
+          }
+          nodeId = (nodeId + 1) % N;
+        } while (count > 0);
+
+      }
+    }
+    if (nodes.length > 1)
+    {
+      int prevNodeCount = nodes[0];
+      for (int i = 1; i < nodes.length; i++)
+      {
+        int newNodeCount = prevNodeCount + nodes[i];
+        int masterPartitionsToMove = (int) ((partitions * 1.0 / prevNodeCount - partitions
+            * 1.0 / newNodeCount) * 1 * prevNodeCount);
+        while (masterPartitionsToMove > 0)
+        {
+
+        }
+
+      }
+    }
+    assignment.printPerNode();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ZKDumper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZKDumper.java b/helix-core/src/main/java/org/apache/helix/tools/ZKDumper.java
new file mode 100644
index 0000000..279cf76
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZKDumper.java
@@ -0,0 +1,274 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.List;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.helix.manager.zk.ZkClient;
+
+
+/**
+ * Dumps the Zookeeper file structure on to Disk
+ * 
+ * @author kgopalak
+ * 
+ */
+@SuppressWarnings("static-access")
+public class ZKDumper
+{
+  private ZkClient client;
+  private FilenameFilter filter;
+  static Options options;
+  private String suffix = "";
+  //enable by default
+  private boolean removeSuffix=true;
+
+  public String getSuffix()
+  {
+    return suffix;
+  }
+
+  public void setSuffix(String suffix)
+  {
+    this.suffix = suffix;
+  }
+
+  public boolean isRemoveSuffix()
+  {
+    return removeSuffix;
+  }
+
+  public void setRemoveSuffix(boolean removeSuffix)
+  {
+    this.removeSuffix = removeSuffix;
+  }
+
+  static
+  {
+    options = new Options();
+    OptionGroup optionGroup = new OptionGroup();
+    
+    Option d = OptionBuilder.withLongOpt("download")
+        .withDescription("Download from ZK to File System").create();
+    d.setArgs(0);
+    Option dSuffix = OptionBuilder.withLongOpt("addSuffix")
+        .withDescription("add suffix to every file downloaded from ZK").create();
+    dSuffix.setArgs(1);
+    dSuffix.setRequired(false);
+    
+    Option u = OptionBuilder.withLongOpt("upload")
+        .withDescription("Upload from File System to ZK").create();
+    u.setArgs(0);
+    Option uSuffix = OptionBuilder.withLongOpt("removeSuffix")
+        .withDescription("remove suffix from every file uploaded to ZK").create();
+    uSuffix.setArgs(0);
+    uSuffix.setRequired(false);
+    
+    Option del = OptionBuilder.withLongOpt("delete")
+        .withDescription("Delete given path from ZK").create();
+
+    optionGroup.setRequired(true);
+    optionGroup.addOption(del);
+    optionGroup.addOption(u);
+    optionGroup.addOption(d);
+    options.addOptionGroup(optionGroup);
+    options.addOption("zkSvr", true, "Zookeeper address");
+    options.addOption("zkpath", true, "Zookeeper path");
+    options.addOption("fspath", true, "Path on local Filesystem to dump");
+    options.addOption("h", "help", false, "Print this usage information");
+    options.addOption("v", "verbose", false, "Print out VERBOSE information");
+    options.addOption(dSuffix);
+    options.addOption(uSuffix);
+  }
+
+  public ZKDumper(String zkAddress)
+  {
+    client = new ZkClient(zkAddress, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+    ZkSerializer zkSerializer = new ZkSerializer()
+    {
+
+      @Override
+      public byte[] serialize(Object arg0) throws ZkMarshallingError
+      {
+        return arg0.toString().getBytes();
+      }
+
+      @Override
+      public Object deserialize(byte[] arg0) throws ZkMarshallingError
+      {
+        return new String(arg0);
+      }
+    };
+    client.setZkSerializer(zkSerializer);
+    filter = new FilenameFilter()
+    {
+
+      @Override
+      public boolean accept(File dir, String name)
+      {
+        return !name.startsWith(".");
+      }
+    };
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    if (args == null || args.length == 0)
+    {
+      HelpFormatter helpFormatter = new HelpFormatter();
+      helpFormatter.printHelp("java " + ZKDumper.class.getName(), options);
+      System.exit(1);
+    }
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = parser.parse(options, args);
+    cmd.hasOption("zkSvr");
+    boolean download = cmd.hasOption("download");
+    boolean upload = cmd.hasOption("upload");
+    boolean del = cmd.hasOption("delete");
+    String zkAddress = cmd.getOptionValue("zkSvr");
+    String zkPath = cmd.getOptionValue("zkpath");
+    String fsPath = cmd.getOptionValue("fspath");
+
+    ZKDumper zkDump = new ZKDumper(zkAddress);
+    if (download)
+    {
+      if (cmd.hasOption("addSuffix"))
+      {
+        zkDump.suffix = cmd.getOptionValue("addSuffix");
+      }
+      zkDump.download(zkPath, fsPath + zkPath);
+    }
+    if (upload)
+    {
+      if (cmd.hasOption("removeSuffix"))
+      {
+        zkDump.removeSuffix = true;
+      }
+      zkDump.upload(zkPath, fsPath);
+    }
+    if (del)
+    {
+      zkDump.delete(zkPath);
+    }
+  }
+
+  private void delete(String zkPath)
+  {
+    client.deleteRecursive(zkPath);
+
+  }
+
+  public void upload(String zkPath, String fsPath) throws Exception
+  {
+    File file = new File(fsPath);
+    System.out
+        .println("Uploading " + file.getCanonicalPath() + " to " + zkPath);
+    zkPath = zkPath.replaceAll("[/]+", "/");
+    int index = -1;
+    if (removeSuffix && (index = file.getName().indexOf(".")) > -1)
+    {
+      zkPath = zkPath.replaceAll(file.getName().substring(index), "");
+    }
+    if (file.isDirectory())
+    {
+      File[] children = file.listFiles(filter);
+      client.createPersistent(zkPath, true);
+      if (children != null && children.length > 0)
+      {
+
+        for (File child : children)
+        {
+          upload(zkPath + "/" + child.getName(), fsPath + "/" + child.getName());
+        }
+      } else
+      {
+
+      }
+    } else
+    {
+      BufferedReader bfr = null;
+      try
+      {
+        bfr = new BufferedReader(new FileReader(file));
+        StringBuilder sb = new StringBuilder();
+        String line;
+        String recordDelimiter = "";
+        while ((line = bfr.readLine()) != null)
+        {
+          sb.append(recordDelimiter).append(line);
+          recordDelimiter = "\n";
+        }
+        client.createPersistent(zkPath, sb.toString());
+      } catch (Exception e)
+      {
+        throw e;
+      } finally
+      {
+        if (bfr != null)
+        {
+          try
+          {
+            bfr.close();
+          } catch (IOException e)
+          {
+          }
+        }
+      }
+    }
+  }
+
+  public void download(String zkPath, String fsPath) throws Exception
+  {
+    
+    List<String> children = client.getChildren(zkPath);
+    if (children != null && children.size() > 0)
+    {
+      new File(fsPath).mkdirs();
+      for (String child : children)
+      {
+        String childPath = zkPath.equals("/")? "/" + child : zkPath + "/" + child;
+        download(childPath, fsPath + "/" + child);
+      }
+    } else
+    {
+      System.out.println("Saving " + zkPath + " to "
+          + new File(fsPath + suffix).getCanonicalPath());
+      FileWriter fileWriter = new FileWriter(fsPath + suffix);
+      Object readData = client.readData(zkPath);
+      if (readData != null)
+      {
+        fileWriter.write((String) readData);
+      }
+      fileWriter.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ZKLogFormatter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZKLogFormatter.java b/helix-core/src/main/java/org/apache/helix/tools/ZKLogFormatter.java
new file mode 100644
index 0000000..ec5e0d0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZKLogFormatter.java
@@ -0,0 +1,395 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import javax.xml.bind.annotation.adapters.HexBinaryAdapter;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.Record;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.persistence.FileHeader;
+import org.apache.zookeeper.server.persistence.FileSnap;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.TxnHeader;
+
+public class ZKLogFormatter
+{
+  private static final Logger LOG = Logger.getLogger(ZKLogFormatter.class);
+  private static DateFormat dateTimeInstance = DateFormat.getDateTimeInstance(
+      DateFormat.SHORT, DateFormat.LONG);
+  private static HexBinaryAdapter adapter = new HexBinaryAdapter();
+  private static String fieldDelim = ":";
+  private static String fieldSep = " ";
+
+  static BufferedWriter bw = null;
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception
+  {
+    if (args.length != 2 && args.length != 3)
+    {
+      System.err.println("USAGE: LogFormatter <log|snapshot> log_file");
+      System.exit(2);
+    }
+    
+    if (args.length == 3)
+    {
+      bw = new BufferedWriter(new FileWriter(new File(args[2])));
+    }
+    
+    if (args[0].equals("log"))
+    {
+      readTransactionLog(args[1]);
+    } else if (args[0].equals("snapshot"))
+    {
+      readSnapshotLog(args[1]);
+    }
+    
+    if (bw != null)
+    {
+      bw.close();
+    }
+  }
+
+  private static void readSnapshotLog(String snapshotPath) throws Exception
+  {
+    FileInputStream fis = new FileInputStream(snapshotPath);
+    BinaryInputArchive ia = BinaryInputArchive.getArchive(fis);
+    Map<Long, Integer> sessions = new HashMap<Long, Integer>();
+    DataTree dt = new DataTree();
+    FileHeader header = new FileHeader();
+    header.deserialize(ia, "fileheader");
+    if (header.getMagic() != FileSnap.SNAP_MAGIC)
+    {
+      throw new IOException("mismatching magic headers " + header.getMagic()
+          + " !=  " + FileSnap.SNAP_MAGIC);
+    }
+    SerializeUtils.deserializeSnapshot(dt, ia, sessions);
+
+    if (bw != null)
+    {
+      bw.write(sessions.toString());
+      bw.newLine();
+    } else
+    {
+      System.out.println(sessions);      
+    }
+    traverse(dt, 1, "/");
+
+  }
+
+  /*
+   * Level order traversal
+   */
+  private static void traverse(DataTree dt, int startId, String startPath) throws Exception
+  {
+    LinkedList<Pair> queue = new LinkedList<Pair>();
+    queue.add(new Pair(startPath, startId));
+    while (!queue.isEmpty())
+    {
+      Pair pair = queue.removeFirst();
+      String path = pair._path;
+      DataNode head = dt.getNode(path);
+      Stat stat = new Stat();
+      byte[] data = null;
+      try
+      {
+        data = dt.getData(path, stat, null);
+      } catch (NoNodeException e)
+      {
+        e.printStackTrace();
+      }
+      // print the node
+      format(startId, pair, head, data);
+      Set<String> children = head.getChildren();
+      if (children != null)
+      {
+        for (String child : children)
+        {
+          String childPath;
+          if (path.endsWith("/"))
+          {
+            childPath = path + child;
+          } else
+          {
+            childPath = path + "/" + child;
+          }
+          queue.add(new Pair(childPath, startId));
+        }
+      }
+      startId = startId + 1;
+    }
+
+  }
+
+  static class Pair
+  {
+
+    private final String _path;
+    private final int _parentId;
+
+    public Pair(String path, int parentId)
+    {
+      _path = path;
+      _parentId = parentId;
+    }
+
+  }
+
+  private static void format(int id, Pair pair, DataNode head, byte[] data) throws Exception
+  {
+    String dataStr = "";
+    if (data != null)
+    {
+      dataStr = new String(data).replaceAll("[\\s]+", "");
+    }
+    StringBuffer sb = new StringBuffer();
+    //@formatter:off
+    sb.append("id").append(fieldDelim).append(id).append(fieldSep);
+    sb.append("parent").append(fieldDelim).append(pair._parentId).append(fieldSep);
+    sb.append("path").append(fieldDelim).append(pair._path).append(fieldSep);
+    sb.append("session").append(fieldDelim).append("0x" +Long.toHexString(head.stat.getEphemeralOwner())).append(fieldSep);
+    sb.append("czxid").append(fieldDelim).append("0x" +Long.toHexString(head.stat.getCzxid())).append(fieldSep);
+    sb.append("ctime").append(fieldDelim).append(head.stat.getCtime()).append(fieldSep);
+    sb.append("mtime").append(fieldDelim).append(head.stat.getMtime()).append(fieldSep);
+    sb.append("cmzxid").append(fieldDelim).append("0x" +Long.toHexString(head.stat.getMzxid())).append(fieldSep);
+    sb.append("pzxid").append(fieldDelim).append("0x" +Long.toHexString(head.stat.getPzxid())).append(fieldSep);
+    sb.append("aversion").append(fieldDelim).append(head.stat.getAversion()).append(fieldSep);
+    sb.append("cversion").append(fieldDelim).append(head.stat.getCversion()).append(fieldSep);
+    sb.append("version").append(fieldDelim).append(head.stat.getVersion()).append(fieldSep);
+    sb.append("data").append(fieldDelim).append(dataStr).append(fieldSep);
+    //@formatter:on
+
+    if (bw != null)
+    {
+      bw.write(sb.toString());
+      bw.newLine();
+    } else
+    {
+      System.out.println(sb);      
+    }
+
+  }
+
+  private static void readTransactionLog(String logfilepath)
+      throws FileNotFoundException, IOException, EOFException
+  {
+    FileInputStream fis = new FileInputStream(logfilepath);
+    BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis);
+    FileHeader fhdr = new FileHeader();
+    fhdr.deserialize(logStream, "fileheader");
+
+    if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC)
+    {
+      System.err.println("Invalid magic number for " + logfilepath);
+      System.exit(2);
+    }
+
+    if (bw != null)
+    {
+      bw.write("ZooKeeper Transactional Log File with dbid "
+          + fhdr.getDbid() + " txnlog format version " + fhdr.getVersion());
+      bw.newLine();
+    } else
+    {
+      System.out.println("ZooKeeper Transactional Log File with dbid "
+          + fhdr.getDbid() + " txnlog format version " + fhdr.getVersion());
+    }
+
+    
+    int count = 0;
+    while (true)
+    {
+      long crcValue;
+      byte[] bytes;
+      try
+      {
+        crcValue = logStream.readLong("crcvalue");
+
+        bytes = logStream.readBuffer("txnEntry");
+      } catch (EOFException e)
+      {
+        if (bw != null)
+        {
+          bw.write("EOF reached after " + count + " txns.");
+          bw.newLine();
+        } else
+        {
+          System.out.println("EOF reached after " + count + " txns.");
+        }
+
+        break;
+      }
+      if (bytes.length == 0)
+      {
+        // Since we preallocate, we define EOF to be an
+        // empty transaction
+        if (bw != null)
+        {
+          bw.write("EOF reached after " + count + " txns.");
+          bw.newLine();
+        } else
+        {
+          System.out.println("EOF reached after " + count + " txns.");
+        }
+
+        return;
+      }
+      Checksum crc = new Adler32();
+      crc.update(bytes, 0, bytes.length);
+      if (crcValue != crc.getValue())
+      {
+        throw new IOException("CRC doesn't match " + crcValue + " vs "
+            + crc.getValue());
+      }
+      InputArchive iab = BinaryInputArchive
+          .getArchive(new ByteArrayInputStream(bytes));
+      TxnHeader hdr = new TxnHeader();
+      Record txn = SerializeUtils.deserializeTxn(iab, hdr);
+      if (bw != null)
+      {
+        bw.write(formatTransaction(hdr, txn));
+        bw.newLine();
+      } else
+      {
+        System.out.println(formatTransaction(hdr, txn));
+      }
+
+      if (logStream.readByte("EOR") != 'B')
+      {
+        LOG.error("Last transaction was partial.");
+        throw new EOFException("Last transaction was partial.");
+      }
+      count++;
+    }
+  }
+
+  static String op2String(int op)
+  {
+    switch (op)
+    {
+    case OpCode.notification:
+      return "notification";
+    case OpCode.create:
+      return "create";
+    case OpCode.delete:
+      return "delete";
+    case OpCode.exists:
+      return "exists";
+    case OpCode.getData:
+      return "getDate";
+    case OpCode.setData:
+      return "setData";
+    case OpCode.getACL:
+      return "getACL";
+    case OpCode.setACL:
+      return "setACL";
+    case OpCode.getChildren:
+      return "getChildren";
+    case OpCode.getChildren2:
+      return "getChildren2";
+    case OpCode.ping:
+      return "ping";
+    case OpCode.createSession:
+      return "createSession";
+    case OpCode.closeSession:
+      return "closeSession";
+    case OpCode.error:
+      return "error";
+    default:
+      return "unknown " + op;
+    }
+  }
+
+  private static String formatTransaction(TxnHeader header, Record txn)
+  {
+    StringBuilder sb = new StringBuilder();
+
+    sb.append("time").append(fieldDelim).append(header.getTime());
+    sb.append(fieldSep).append("session").append(fieldDelim).append("0x")
+        .append(Long.toHexString(header.getClientId()));
+    sb.append(fieldSep).append("cxid").append(fieldDelim).append("0x")
+        .append(Long.toHexString(header.getCxid()));
+    sb.append(fieldSep).append("zxid").append(fieldDelim).append("0x")
+        .append(Long.toHexString(header.getZxid()));
+    sb.append(fieldSep).append("type").append(fieldDelim)
+        .append(op2String(header.getType()));
+    if (txn != null)
+    {
+      try
+      {
+        byte[] data = null;
+        for (PropertyDescriptor pd : Introspector.getBeanInfo(txn.getClass())
+            .getPropertyDescriptors())
+        {
+          if (pd.getName().equalsIgnoreCase("data"))
+          {
+            data = (byte[]) pd.getReadMethod().invoke(txn);
+            continue;
+          }
+          if (pd.getReadMethod() != null && !"class".equals(pd.getName()))
+          {
+            sb.append(fieldSep)
+                .append(pd.getDisplayName())
+                .append(fieldDelim)
+                .append(
+                    pd.getReadMethod().invoke(txn).toString()
+                        .replaceAll("[\\s]+", ""));
+          }
+        }
+        if (data != null)
+        {
+          sb.append(fieldSep).append("data").append(fieldDelim)
+              .append(new String(data).replaceAll("[\\s]+", ""));
+        }
+      } catch (Exception e)
+      {
+        LOG.error(
+            "Error while retrieving bean property values for " + txn.getClass(),
+            e);
+      }
+    }
+
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java b/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
new file mode 100644
index 0000000..010ec17
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
@@ -0,0 +1,491 @@
+package org.apache.helix.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.log4j.Logger;
+
+
+public class ZkLogAnalyzer
+{
+  private static Logger           LOG           = Logger.getLogger(ZkLogAnalyzer.class);
+  private static boolean          dump          = false;                                 ;
+  final static ZNRecordSerializer _deserializer = new ZNRecordSerializer();
+
+  static class Stats
+  {
+    int msgSentCount        = 0;
+    int msgSentCount_O2S    = 0; // Offline to Slave
+    int msgSentCount_S2M    = 0; // Slave to Master
+    int msgSentCount_M2S    = 0; // Master to Slave
+    int msgDeleteCount      = 0;
+    int msgModifyCount      = 0;
+    int curStateCreateCount = 0;
+    int curStateUpdateCount = 0;
+    int extViewCreateCount  = 0;
+    int extViewUpdateCount  = 0;
+  }
+
+  static String getAttributeValue(String line, String attribute)
+  {
+    if (line == null)
+      return null;
+    String[] parts = line.split("\\s");
+    if (parts != null && parts.length > 0)
+    {
+      for (int i = 0; i < parts.length; i++)
+      {
+        if (parts[i].startsWith(attribute))
+        {
+          String val = parts[i].substring(attribute.length());
+          return val;
+        }
+      }
+    }
+    return null;
+  }
+
+  static String findLastCSUpdateBetween(List<String> csUpdateLines, long start, long end)
+  {
+    long lastCSUpdateTimestamp = Long.MIN_VALUE;
+    String lastCSUpdateLine = null;
+    for (String line : csUpdateLines)
+    {
+      // ZNRecord record = getZNRecord(line);
+      long timestamp = Long.parseLong(getAttributeValue(line, "time:"));
+      if (timestamp >= start && timestamp <= end && timestamp > lastCSUpdateTimestamp)
+      {
+        lastCSUpdateTimestamp = timestamp;
+        lastCSUpdateLine = line;
+      }
+    }
+    assert (lastCSUpdateLine != null) : "No CS update between " + start + " - " + end;
+    return lastCSUpdateLine;
+  }
+
+  static ZNRecord getZNRecord(String line)
+  {
+    ZNRecord record = null;
+    String value = getAttributeValue(line, "data:");
+    if (value != null)
+    {
+      record = (ZNRecord) _deserializer.deserialize(value.getBytes());
+      // if (record == null)
+      // {
+      // System.out.println(line);
+      // }
+    }
+    return record;
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    if (args.length != 3)
+    {
+      System.err.println("USAGE: ZkLogAnalyzer zkLogDir clusterName testStartTime (yyMMdd_hhmmss_SSS)");
+      System.exit(1);
+    }
+
+    System.out.println("ZkLogAnalyzer called with args: " + Arrays.toString(args));
+    // get create-timestamp of "/" + clusterName
+    // find all zk logs after that create-timestamp and parse them
+    // save parsed log in /tmp/zkLogAnalyzor_zklog.parsed0,1,2...
+
+    String zkLogDir = args[0];
+    String clusterName = args[1];
+    // String zkAddr = args[2];
+    String startTimeStr = args[2];
+    // ZkClient zkClient = new ZkClient(zkAddr);
+    // Stat clusterCreateStat = zkClient.getStat("/" + clusterName);
+    SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS");
+    Date date = formatter.parse(startTimeStr);
+    long startTimeStamp = date.getTime();
+
+    System.out.println(clusterName + " created at " + date);
+    while (zkLogDir.endsWith("/"))
+    {
+      zkLogDir = zkLogDir.substring(0, zkLogDir.length() - 1);
+    }
+    if (!zkLogDir.endsWith("/version-2"))
+    {
+      zkLogDir = zkLogDir + "/version-2";
+    }
+    File dir = new File(zkLogDir);
+    File[] zkLogs = dir.listFiles(new FileFilter()
+    {
+
+      @Override
+      public boolean accept(File file)
+      {
+        return file.isFile() && (file.getName().indexOf("log") != -1);
+      }
+    });
+
+    // lastModified time -> zkLog
+    TreeMap<Long, String> lastZkLogs = new TreeMap<Long, String>();
+    for (File file : zkLogs)
+    {
+      if (file.lastModified() > startTimeStamp)
+      {
+        lastZkLogs.put(file.lastModified(), file.getAbsolutePath());
+      }
+    }
+
+    List<String> parsedZkLogs = new ArrayList<String>();
+    int i = 0;
+    System.out.println("zk logs last modified later than "
+        + new Timestamp(startTimeStamp));
+    for (Long lastModified : lastZkLogs.keySet())
+    {
+      String fileName = lastZkLogs.get(lastModified);
+      System.out.println(new Timestamp(lastModified) + ": "
+          + (fileName.substring(fileName.lastIndexOf('/') + 1)));
+
+      String parsedFileName = "zkLogAnalyzor_zklog.parsed" + i;
+      i++;
+      ZKLogFormatter.main(new String[] { "log", fileName, parsedFileName });
+      parsedZkLogs.add(parsedFileName);
+    }
+
+    // sessionId -> create liveInstance line
+    Map<String, String> sessionMap = new HashMap<String, String>();
+
+    // message send lines in time order
+    // List<String> sendMessageLines = new ArrayList<String>();
+
+    // CS update lines in time order
+    List<String> csUpdateLines = new ArrayList<String>();
+
+    String leaderSession = null;
+
+    System.out.println();
+    Stats stats = new Stats();
+    long lastTestStartTimestamp = Long.MAX_VALUE;
+    long controllerStartTime = 0;
+    for (String parsedZkLog : parsedZkLogs)
+    {
+
+      FileInputStream fis = new FileInputStream(parsedZkLog);
+      BufferedReader br = new BufferedReader(new InputStreamReader(fis));
+
+      String inputLine;
+      while ((inputLine = br.readLine()) != null)
+      {
+        String timestamp = getAttributeValue(inputLine, "time:");
+        if (timestamp == null)
+        {
+          continue;
+        }
+        long timestampVal = Long.parseLong(timestamp);
+        if (timestampVal < startTimeStamp)
+        {
+          continue;
+        }
+
+        if (dump == true)
+        {
+          String printLine = inputLine.replaceAll("data:.*", "");
+          printLine = new Timestamp(timestampVal) + " " + printLine.substring(printLine.indexOf("session:"));
+          System.err.println(printLine);
+        }
+
+        if (inputLine.indexOf("/start_disable") != -1)
+        {
+          dump = true;
+        }
+        if (inputLine.indexOf("/" + clusterName + "/CONFIGS/CLUSTER/verify") != -1)
+        {
+          String type = getAttributeValue(inputLine, "type:");
+          if (type.equals("delete"))
+          {
+            System.out.println(timestamp + ": verify done");
+            System.out.println("lastTestStartTimestamp:" + lastTestStartTimestamp);
+            String lastCSUpdateLine =
+                findLastCSUpdateBetween(csUpdateLines,
+                                        lastTestStartTimestamp,
+                                        timestampVal);
+            long lastCSUpdateTimestamp =
+                Long.parseLong(getAttributeValue(lastCSUpdateLine, "time:"));
+            System.out.println("Last CS Update:" + lastCSUpdateTimestamp);
+
+            System.out.println("state transition latency: "
+                + +(lastCSUpdateTimestamp - lastTestStartTimestamp) + "ms");
+
+            System.out.println("state transition latency since controller start: "
+                + +(lastCSUpdateTimestamp - controllerStartTime) + "ms");
+
+            System.out.println("Create MSG\t" + stats.msgSentCount + "\t"
+                + stats.msgSentCount_O2S + "\t" + stats.msgSentCount_S2M + "\t"
+                + stats.msgSentCount_M2S);
+            System.out.println("Modify MSG\t" + stats.msgModifyCount);
+            System.out.println("Delete MSG\t" + stats.msgDeleteCount);
+            System.out.println("Create CS\t" + stats.curStateCreateCount);
+            System.out.println("Update CS\t" + stats.curStateUpdateCount);
+            System.out.println("Create EV\t" + stats.extViewCreateCount);
+            System.out.println("Update EV\t" + stats.extViewUpdateCount);
+
+            System.out.println();
+            stats = new Stats();
+            lastTestStartTimestamp = Long.MAX_VALUE;
+          }
+        }
+        else if (inputLine.indexOf("/" + clusterName + "/LIVEINSTANCES/") != -1)
+        {
+          // cluster startup
+          if (timestampVal < lastTestStartTimestamp)
+          {
+            System.out.println("START cluster. SETTING lastTestStartTimestamp to "
+                + new Timestamp(timestampVal) + "\nline:" + inputLine);
+            lastTestStartTimestamp = timestampVal;
+          }
+
+          ZNRecord record = getZNRecord(inputLine);
+          LiveInstance liveInstance = new LiveInstance(record);
+          String session = getAttributeValue(inputLine, "session:");
+          sessionMap.put(session, inputLine);
+          System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LIVEINSTANCE "
+              + liveInstance.getInstanceName());
+        }
+        else if (inputLine.indexOf("closeSession") != -1)
+        {
+          // kill any instance
+          String session = getAttributeValue(inputLine, "session:");
+          if (sessionMap.containsKey(session))
+          {
+            if (timestampVal < lastTestStartTimestamp)
+            {
+              System.out.println("KILL node. SETTING lastTestStartTimestamp to " + timestampVal
+                  + " line:" + inputLine);
+              lastTestStartTimestamp = timestampVal;
+            }
+            String line = sessionMap.get(session);
+            ZNRecord record = getZNRecord(line);
+            LiveInstance liveInstance = new LiveInstance(record);
+
+            System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": close session "
+                + liveInstance.getInstanceName());
+            dump = true;
+          }
+        }
+        else if (inputLine.indexOf("/" + clusterName + "/CONFIGS/PARTICIPANT") != -1)
+        {
+          // disable a partition
+          String type = getAttributeValue(inputLine, "type:");
+          if (type.equals("setData") && inputLine.indexOf("HELIX_DISABLED_PARTITION") != -1)
+          {
+            if (timestampVal < lastTestStartTimestamp)
+            {
+              System.out.println("DISABLE partition. SETTING lastTestStartTimestamp to " + timestampVal
+                  + " line:" + inputLine);
+              lastTestStartTimestamp = timestampVal;
+            }
+          }
+        } else if (inputLine.indexOf("/" + clusterName + "/CONTROLLER/LEADER") != -1)
+        {
+          // leaderLine = inputLine;
+          ZNRecord record = getZNRecord(inputLine);
+          LiveInstance liveInstance = new LiveInstance(record);
+          String session = getAttributeValue(inputLine, "session:");
+          leaderSession = session;
+          controllerStartTime = Long.parseLong(getAttributeValue(inputLine, "time:"));
+          sessionMap.put(session, inputLine);
+          System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LEADER "
+              + liveInstance.getInstanceName());
+        }
+        else if (inputLine.indexOf("/" + clusterName + "/") != -1
+            && inputLine.indexOf("/CURRENTSTATES/") != -1)
+        {
+          String type = getAttributeValue(inputLine, "type:");
+          if (type.equals("create"))
+          {
+            stats.curStateCreateCount++;
+          }
+          else if (type.equals("setData"))
+          {
+            String path = getAttributeValue(inputLine, "path:");
+            csUpdateLines.add(inputLine);
+            stats.curStateUpdateCount++;
+            // getAttributeValue(line, "data");
+            System.out.println("Update currentstate:"
+                + new Timestamp(Long.parseLong(timestamp)) + ":" + timestamp + " path:"
+                + path);
+          }
+        }
+        else if (inputLine.indexOf("/" + clusterName + "/EXTERNALVIEW/") != -1)
+        {
+          String session = getAttributeValue(inputLine, "session:");
+          if (session.equals(leaderSession))
+          {
+            String type = getAttributeValue(inputLine, "type:");
+            if (type.equals("create"))
+            {
+              stats.extViewCreateCount++;
+            }
+            else if (type.equals("setData"))
+            {
+              stats.extViewUpdateCount++;
+            }
+          }
+
+          // pos = inputLine.indexOf("EXTERNALVIEW");
+          // pos = inputLine.indexOf("data:{", pos);
+          // if (pos != -1)
+          // {
+          // String timestamp = getAttributeValue(inputLine, "time:");
+          // ZNRecord record =
+          // (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+          // .getBytes());
+          // ExternalView extView = new ExternalView(record);
+          // int masterCnt = ClusterStateVerifier.countStateNbInExtView(extView,
+          // "MASTER");
+          // int slaveCnt = ClusterStateVerifier.countStateNbInExtView(extView, "SLAVE");
+          // if (masterCnt == 1200)
+          // {
+          // System.out.println(timestamp + ": externalView " + extView.getResourceName()
+          // + " has " + masterCnt + " MASTER, " + slaveCnt + " SLAVE");
+          // }
+          // }
+        }
+        else if (inputLine.indexOf("/" + clusterName + "/") != -1
+            && inputLine.indexOf("/MESSAGES/") != -1)
+        {
+          String type = getAttributeValue(inputLine, "type:");
+
+          if (type.equals("create"))
+          {
+            ZNRecord record = getZNRecord(inputLine);
+            Message msg = new Message(record);
+            String sendSession = getAttributeValue(inputLine, "session:");
+            if (sendSession.equals(leaderSession)
+                && msg.getMsgType().equals("STATE_TRANSITION")
+                && msg.getMsgState() == MessageState.NEW)
+            {
+              // sendMessageLines.add(inputLine);
+              stats.msgSentCount++;
+
+              if (msg.getFromState().equals("OFFLINE")
+                  && msg.getToState().equals("SLAVE"))
+              {
+                stats.msgSentCount_O2S++;
+              }
+              else if (msg.getFromState().equals("SLAVE")
+                  && msg.getToState().equals("MASTER"))
+              {
+                stats.msgSentCount_S2M++;
+              }
+              else if (msg.getFromState().equals("MASTER")
+                  && msg.getToState().equals("SLAVE"))
+              {
+                stats.msgSentCount_M2S++;
+              }
+              // System.out.println("Message create:"+new
+              // Timestamp(Long.parseLong(timestamp)));
+            }
+
+            // pos = inputLine.indexOf("MESSAGES");
+            // pos = inputLine.indexOf("data:{", pos);
+            // if (pos != -1)
+            // {
+            //
+            // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
+            // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
+            // Message msg = new Message(record);
+            // MessageState msgState = msg.getMsgState();
+            // String msgType = msg.getMsgType();
+            // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.NEW)
+            // {
+            // if (!msgs.containsKey(msg.getMsgId()))
+            // {
+            // msgs.put(msg.getMsgId(), new MsgItem(Long.parseLong(timestamp), msg));
+            // }
+            // else
+            // {
+            // LOG.error("msg: " + msg.getMsgId() + " already sent");
+            // }
+            //
+            // System.out.println(timestamp + ": sendMsg " + msg.getPartitionName() + "("
+            // + msg.getFromState() + "->" + msg.getToState() + ") to "
+            // + msg.getTgtName() + ", size: " + msgBytes.length);
+            // }
+            // }
+          }
+          else if (type.equals("setData"))
+          {
+            stats.msgModifyCount++;
+            // pos = inputLine.indexOf("MESSAGES");
+            // pos = inputLine.indexOf("data:{", pos);
+            // if (pos != -1)
+            // {
+            //
+            // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
+            // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
+            // Message msg = new Message(record);
+            // MessageState msgState = msg.getMsgState();
+            // String msgType = msg.getMsgType();
+            // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.READ)
+            // {
+            // if (!msgs.containsKey(msg.getMsgId()))
+            // {
+            // LOG.error("msg: " + msg.getMsgId() + " never sent");
+            // }
+            // else
+            // {
+            // MsgItem msgItem = msgs.get(msg.getMsgId());
+            // if (msgItem.readTime == 0)
+            // {
+            // msgItem.readTime = Long.parseLong(timestamp);
+            // msgs.put(msg.getMsgId(), msgItem);
+            // // System.out.println(timestamp + ": readMsg " + msg.getPartitionName()
+            // // + "("
+            // // + msg.getFromState() + "->" + msg.getToState() + ") to "
+            // // + msg.getTgtName() + ", latency: " + (msgItem.readTime -
+            // // msgItem.sendTime));
+            // }
+            // }
+            //
+            // }
+            // }
+          }
+          else if (type.equals("delete"))
+          {
+            stats.msgDeleteCount++;
+            // String msgId = path.substring(path.lastIndexOf('/') + 1);
+            // if (msgs.containsKey(msgId))
+            // {
+            // MsgItem msgItem = msgs.get(msgId);
+            // Message msg = msgItem.msg;
+            // msgItem.deleteTime = Long.parseLong(timestamp);
+            // msgs.put(msgId, msgItem);
+            // msgItem.latency = msgItem.deleteTime - msgItem.sendTime;
+            // System.out.println(timestamp + ": delMsg " + msg.getPartitionName() + "("
+            // + msg.getFromState() + "->" + msg.getToState() + ") to "
+            // + msg.getTgtName() + ", latency: " + msgItem.latency);
+            // }
+            // else
+            // {
+            // // messages other than STATE_TRANSITION message
+            // // LOG.error("msg: " + msgId + " never sent");
+            // }
+          }
+        }
+      } // end of [br.readLine()) != null]
+    }
+  }
+}