You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[24/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
new file mode 100644
index 0000000..62313a4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
@@ -0,0 +1,1029 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.tools.TestCommand.CommandType;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * a test is structured logically as a list of commands a command has three parts: COMMAND
+ * | TRIGGER | ARG'S COMMAND could be: modify, verify, start, stop
+ *
+ * TRIGGER is optional and consists of start-time, timeout, and expect-value which means
+ * the COMMAND is triggered between [start-time, start-time + timeout] and is triggered
+ * when the value in concern equals to expect-value
+ *
+ * ARG's format depends on COMMAND if COMMAND is modify/verify, arg is in form of:
+ * <znode-path, property-type (SIMPLE, LIST, or MAP), operation(+, -, ==, !=), key,
+ * update-value> in which key is k1 for SIMPLE, k1|index for LIST, and k1|k2 for MAP field
+ * if COMMAND is start/stop, arg is a thread handler
+ *
+ * @author zzhang
+ *
+ */
+
+public class TestExecutor
+{
+ /**
+ * SIMPLE: simple field change LIST: list field change MAP: map field change ZNODE:
+ * entire znode change
+ */
+ public enum ZnodePropertyType
+ {
+ SIMPLE, LIST, MAP, ZNODE
+ }
+
+ private enum ZnodeModValueType
+ {
+ INVALID, SINGLE_VALUE, LIST_VALUE, MAP_VALUE, ZNODE_VALUE
+ }
+
+ private static Logger logger =
+ Logger.getLogger(TestExecutor.class);
+ private static final long SLEEP_TIME = 500; // in
+ // ms
+
+ private final static PropertyJsonComparator<String> STRING_COMPARATOR =
+ new PropertyJsonComparator<String>(String.class);
+ private final static PropertyJsonSerializer<ZNRecord> ZNRECORD_SERIALIZER =
+ new PropertyJsonSerializer<ZNRecord>(ZNRecord.class);
+
+ private static ZnodeModValueType getValueType(ZnodePropertyType type, String key)
+ {
+ ZnodeModValueType valueType = ZnodeModValueType.INVALID;
+ switch (type)
+ {
+ case SIMPLE:
+ if (key == null)
+ {
+ logger.warn("invalid key for simple field: key is null");
+ }
+ else
+ {
+ String keyParts[] = key.split("/");
+ if (keyParts.length != 1)
+ {
+ logger.warn("invalid key for simple field: " + key
+ + ", expect 1 part: key1 (no slash)");
+ }
+ else
+ {
+ valueType = ZnodeModValueType.SINGLE_VALUE;
+ }
+ }
+ break;
+ case LIST:
+ if (key == null)
+ {
+ logger.warn("invalid key for simple field: key is null");
+ }
+ else
+ {
+ String keyParts[] = key.split("/");
+ if (keyParts.length < 1 || keyParts.length > 2)
+ {
+ logger.warn("invalid key for list field: " + key
+ + ", expect 1 or 2 parts: key1 or key1/index)");
+ }
+ else if (keyParts.length == 1)
+ {
+ valueType = ZnodeModValueType.LIST_VALUE;
+ }
+ else
+ {
+ try
+ {
+ int index = Integer.parseInt(keyParts[1]);
+ if (index < 0)
+ {
+ logger.warn("invalid key for list field: " + key + ", index < 0");
+ }
+ else
+ {
+ valueType = ZnodeModValueType.SINGLE_VALUE;
+ }
+ }
+ catch (NumberFormatException e)
+ {
+ logger.warn("invalid key for list field: " + key
+ + ", part-2 is NOT an integer");
+ }
+ }
+ }
+ break;
+ case MAP:
+ if (key == null)
+ {
+ logger.warn("invalid key for simple field: key is null");
+ }
+ else
+ {
+ String keyParts[] = key.split("/");
+ if (keyParts.length < 1 || keyParts.length > 2)
+ {
+ logger.warn("invalid key for map field: " + key
+ + ", expect 1 or 2 parts: key1 or key1/key2)");
+ }
+ else if (keyParts.length == 1)
+ {
+ valueType = ZnodeModValueType.MAP_VALUE;
+ }
+ else
+ {
+ valueType = ZnodeModValueType.SINGLE_VALUE;
+ }
+ }
+ break;
+ case ZNODE:
+ valueType = ZnodeModValueType.ZNODE_VALUE;
+ default:
+ break;
+ }
+ return valueType;
+ }
+
+ private static String getSingleValue(ZNRecord record, ZnodePropertyType type, String key)
+ {
+ if (record == null || key == null)
+ {
+ return null;
+ }
+
+ String value = null;
+ String keyParts[] = key.split("/");
+
+ switch (type)
+ {
+ case SIMPLE:
+ value = record.getSimpleField(key);
+ break;
+ case LIST:
+ List<String> list = record.getListField(keyParts[0]);
+ if (list == null)
+ {
+ logger.warn("invalid key for list field: " + key
+ + ", map for key part-1 doesn't exist");
+ return null;
+ }
+ int idx = Integer.parseInt(keyParts[1]);
+ value = list.get(idx);
+ break;
+ case MAP:
+ Map<String, String> map = record.getMapField(keyParts[0]);
+ if (map == null)
+ {
+ logger.warn("invalid key for map field: " + key
+ + ", map for key part-1 doesn't exist");
+ return null;
+ }
+ value = map.get(keyParts[1]);
+ break;
+ default:
+ break;
+ }
+
+ return value;
+ }
+
+ private static List<String> getListValue(ZNRecord record, String key)
+ {
+ if (record == null)
+ {
+ return null;
+ }
+ return record.getListField(key);
+ }
+
+ private static Map<String, String> getMapValue(ZNRecord record, String key)
+ {
+ return record.getMapField(key);
+ }
+
+ // comparator's for single/list/map-value
+ private static boolean compareSingleValue(String actual,
+ String expect,
+ String key,
+ ZNRecord diff)
+ {
+ boolean ret = (STRING_COMPARATOR.compare(actual, expect) == 0);
+
+ if (diff != null)
+ {
+ diff.setSimpleField(key + "/expect", expect);
+ diff.setSimpleField(key + "/actual", actual);
+ }
+ return ret;
+ }
+
+ private static boolean compareListValue(List<String> actualList,
+ List<String> expectList,
+ String key,
+ ZNRecord diff)
+ {
+ boolean ret = true;
+ if (actualList == null && expectList == null)
+ {
+ ret = true;
+ }
+ else if (actualList == null && expectList != null)
+ {
+ ret = false;
+ if (diff != null)
+ {
+ diff.setListField(key + "/expect", expectList);
+ }
+ }
+ else if (actualList != null && expectList == null)
+ {
+ ret = false;
+ if (diff != null)
+ {
+ diff.setListField(key + "/actual", actualList);
+ }
+ }
+ else
+ {
+ Iterator<String> itrActual = actualList.iterator();
+ Iterator<String> itrExpect = expectList.iterator();
+ if (diff != null && diff.getListField(key + "/expect") == null)
+ {
+ diff.setListField(key + "/expect", new ArrayList<String>());
+ }
+
+ if (diff != null && diff.getListField(key + "/actual") == null)
+ {
+ diff.setListField(key + "/actual", new ArrayList<String>());
+ }
+
+ while (itrActual.hasNext() && itrExpect.hasNext())
+ {
+ String actual = itrActual.next();
+ String expect = itrExpect.next();
+
+ if (STRING_COMPARATOR.compare(actual, expect) != 0)
+ {
+ ret = false;
+ if (diff != null)
+ {
+ diff.getListField(key + "/expect").add(expect);
+ diff.getListField(key + "/actual").add(actual);
+ }
+ }
+ }
+
+ while (itrActual.hasNext())
+ {
+ String actual = itrActual.next();
+ if (diff != null)
+ {
+ diff.getListField(key + "/actual").add(actual);
+ }
+ }
+
+ while (itrExpect.hasNext())
+ {
+ String expect = itrExpect.next();
+ if (diff != null)
+ {
+ diff.getListField(key + "/expect").add(expect);
+ }
+ }
+ }
+ return ret;
+ }
+
+ private static void setMapField(ZNRecord record, String key1, String key2, String value)
+ {
+ if (record.getMapField(key1) == null)
+ {
+ record.setMapField(key1, new TreeMap<String, String>());
+ }
+ record.getMapField(key1).put(key2, value);
+ }
+
+ private static boolean compareMapValue(Map<String, String> actualMap,
+ Map<String, String> expectMap,
+ String mapKey,
+ ZNRecord diff)
+ {
+ boolean ret = true;
+ if (actualMap == null && expectMap == null)
+ {
+ ret = true;
+ }
+ else if (actualMap == null && expectMap != null)
+ {
+ ret = false;
+ if (diff != null)
+ {
+ diff.setMapField(mapKey + "/expect", expectMap);
+ }
+ }
+ else if (actualMap != null && expectMap == null)
+ {
+ ret = false;
+ if (diff != null)
+ {
+ diff.setMapField(mapKey + "/actual", actualMap);
+ }
+
+ }
+ else
+ {
+ for (String key : actualMap.keySet())
+ {
+ String actual = actualMap.get(key);
+ if (!expectMap.containsKey(key))
+ {
+ ret = false;
+
+ if (diff != null)
+ {
+ setMapField(diff, mapKey + "/actual", key, actual);
+ }
+ }
+ else
+ {
+ String expect = expectMap.get(key);
+ if (STRING_COMPARATOR.compare(actual, expect) != 0)
+ {
+ ret = false;
+ if (diff != null)
+ {
+ setMapField(diff, mapKey + "/actual", key, actual);
+ setMapField(diff, mapKey + "/expect", key, expect);
+ }
+ }
+ }
+ }
+
+ for (String key : expectMap.keySet())
+ {
+ String expect = expectMap.get(key);
+ if (!actualMap.containsKey(key))
+ {
+ ret = false;
+
+ if (diff != null)
+ {
+ setMapField(diff, mapKey + "/expect", key, expect);
+ }
+ }
+ else
+ {
+ String actual = actualMap.get(key);
+ if (STRING_COMPARATOR.compare(actual, expect) != 0)
+ {
+ ret = false;
+ if (diff != null)
+ {
+ setMapField(diff, mapKey + "/actual", key, actual);
+ setMapField(diff, mapKey + "/expect", key, expect);
+ }
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+ private static void setZNRecord(ZNRecord diff, ZNRecord record, String keySuffix)
+ {
+ if (diff == null || record == null)
+ {
+ return;
+ }
+
+ for (String key : record.getSimpleFields().keySet())
+ {
+ diff.setSimpleField(key + "/" + keySuffix, record.getSimpleField(key));
+ }
+
+ for (String key : record.getListFields().keySet())
+ {
+ diff.setListField(key + "/" + keySuffix, record.getListField(key));
+ }
+
+ for (String key : record.getMapFields().keySet())
+ {
+ diff.setMapField(key + "/" + keySuffix, record.getMapField(key));
+ }
+ }
+
+ private static boolean compareZnodeValue(ZNRecord actual, ZNRecord expect, ZNRecord diff)
+ {
+ boolean ret = true;
+ if (actual == null && expect == null)
+ {
+ ret = true;
+ }
+ else if (actual == null && expect != null)
+ {
+ ret = false;
+ if (diff != null)
+ {
+ setZNRecord(diff, expect, "expect");
+ }
+ }
+ else if (actual != null && expect == null)
+ {
+ ret = false;
+ if (diff != null)
+ {
+ setZNRecord(diff, actual, "actual");
+ }
+ }
+ else
+ {
+ for (String key : actual.getSimpleFields().keySet())
+ {
+ if (compareSingleValue(actual.getSimpleField(key),
+ expect.getSimpleField(key),
+ key,
+ diff) == false)
+ {
+ ret = false;
+ }
+ }
+
+ for (String key : expect.getMapFields().keySet())
+ {
+ if (!actual.getMapFields().containsKey(key))
+ {
+ if (diff != null)
+ {
+ ret = false;
+ diff.setMapField(key + "/expect", expect.getMapField(key));
+ }
+ }
+ else
+ {
+ if (compareMapValue(actual.getMapField(key), expect.getMapField(key), key, diff) == false)
+ {
+ ret = false;
+ }
+ }
+ }
+
+ for (String key : actual.getMapFields().keySet())
+ {
+ if (!expect.getMapFields().containsKey(key))
+ {
+ if (diff != null)
+ {
+ ret = false;
+ diff.setMapField(key + "/actual", actual.getMapField(key));
+ }
+ }
+ else
+ {
+ if (compareMapValue(actual.getMapField(key), expect.getMapField(key), key, diff) == false)
+ {
+ ret = false;
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+ private static void resetZNRecord(ZNRecord record)
+ {
+ if (record != null)
+ {
+ record.getSimpleFields().clear();
+ record.getListFields().clear();
+ record.getMapFields().clear();
+ }
+ }
+
+ private static boolean isValueExpected(ZNRecord current,
+ ZnodePropertyType type,
+ String key,
+ ZnodeValue expect,
+ ZNRecord diff)
+ {
+ // expect value = null means not expect any value
+ if (expect == null)
+ {
+ return true;
+ }
+
+ boolean result = false;
+ resetZNRecord(diff);
+ ZnodeModValueType valueType = getValueType(type, key);
+ switch (valueType)
+ {
+ case SINGLE_VALUE:
+ String singleValue = getSingleValue(current, type, key);
+ result = compareSingleValue(singleValue, expect._singleValue, key, diff);
+ break;
+ case LIST_VALUE:
+ List<String> listValue = getListValue(current, key);
+ result = compareListValue(listValue, expect._listValue, key, diff);
+ break;
+ case MAP_VALUE:
+ Map<String, String> mapValue = getMapValue(current, key);
+ result = compareMapValue(mapValue, expect._mapValue, key, diff);
+ break;
+ case ZNODE_VALUE:
+ result = compareZnodeValue(current, expect._znodeValue, diff);
+ break;
+ case INVALID:
+ break;
+ default:
+ break;
+ }
+ return result;
+ }
+
+ private static void setSingleValue(ZNRecord record,
+ ZnodePropertyType type,
+ String key,
+ String value)
+ {
+ String keyParts[] = key.split("/");
+
+ switch (type)
+ {
+ case SIMPLE:
+ record.setSimpleField(key, value);
+ break;
+ case LIST:
+ List<String> list = record.getListField(keyParts[0]);
+ if (list == null)
+ {
+ logger.warn("invalid key for list field: " + key
+ + ", value for key part-1 doesn't exist");
+ return;
+ }
+ int idx = Integer.parseInt(keyParts[1]);
+ list.remove(idx);
+ list.add(idx, value);
+ break;
+ case MAP:
+ Map<String, String> map = record.getMapField(keyParts[0]);
+ if (map == null)
+ {
+ logger.warn("invalid key for map field: " + key
+ + ", value for key part-1 doesn't exist");
+ return;
+ }
+ map.put(keyParts[1], value);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private static void setListValue(ZNRecord record, String key, List<String> value)
+ {
+ record.setListField(key, value);
+ }
+
+ private static void setMapValue(ZNRecord record, String key, Map<String, String> value)
+ {
+ record.setMapField(key, value);
+ }
+
+ private static void removeSingleValue(ZNRecord record,
+ ZnodePropertyType type,
+ String key)
+ {
+ if (record == null)
+ {
+ return;
+ }
+
+ String keyParts[] = key.split("/");
+ switch (type)
+ {
+ case SIMPLE:
+ record.getSimpleFields().remove(key);
+ break;
+ case LIST:
+ List<String> list = record.getListField(keyParts[0]);
+ if (list == null)
+ {
+ logger.warn("invalid key for list field: " + key
+ + ", value for key part-1 doesn't exist");
+ return;
+ }
+ int idx = Integer.parseInt(keyParts[1]);
+ list.remove(idx);
+ break;
+ case MAP:
+ Map<String, String> map = record.getMapField(keyParts[0]);
+ if (map == null)
+ {
+ logger.warn("invalid key for map field: " + key
+ + ", value for key part-1 doesn't exist");
+ return;
+ }
+ map.remove(keyParts[1]);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private static void removeListValue(ZNRecord record, String key)
+ {
+ if (record == null || record.getListFields() == null)
+ {
+ record.getListFields().remove(key);
+ }
+ }
+
+ private static void removeMapValue(ZNRecord record, String key)
+ {
+ record.getMapFields().remove(key);
+ }
+
+ private static boolean executeVerifier(ZNRecord actual,
+ TestCommand command,
+ ZNRecord diff)
+ {
+ final ZnodeOpArg arg = command._znodeOpArg;
+ final ZnodeValue expectValue = command._trigger._expectValue;
+
+ boolean result =
+ isValueExpected(actual, arg._propertyType, arg._key, expectValue, diff);
+ String operation = arg._operation;
+ if (operation.equals("!="))
+ {
+ result = !result;
+ }
+ else if (!operation.equals("=="))
+ {
+ logger.warn("fail to execute (unsupport operation=" + operation + "):" + operation);
+ result = false;
+ }
+
+ return result;
+ }
+
+ private static boolean compareAndSetZnode(ZnodeValue expect,
+ ZnodeOpArg arg,
+ ZkClient zkClient,
+ ZNRecord diff)
+ {
+ String path = arg._znodePath;
+ ZnodePropertyType type = arg._propertyType;
+ String key = arg._key;
+ boolean success = true;
+
+ // retry 3 times in case there are write conflicts
+ long backoffTime = 20; // ms
+ for (int i = 0; i < 3; i++)
+ {
+ try
+ {
+ Stat stat = new Stat();
+ ZNRecord record = zkClient.<ZNRecord> readDataAndStat(path, stat, true);
+
+ if (isValueExpected(record, type, key, expect, diff))
+ {
+ if (arg._operation.compareTo("+") == 0)
+ {
+ if (record == null)
+ {
+ record = new ZNRecord("default");
+ }
+ ZnodeModValueType valueType = getValueType(arg._propertyType, arg._key);
+ switch (valueType)
+ {
+ case SINGLE_VALUE:
+ setSingleValue(record,
+ arg._propertyType,
+ arg._key,
+ arg._updateValue._singleValue);
+ break;
+ case LIST_VALUE:
+ setListValue(record, arg._key, arg._updateValue._listValue);
+ break;
+ case MAP_VALUE:
+ setMapValue(record, arg._key, arg._updateValue._mapValue);
+ break;
+ case ZNODE_VALUE:
+ // deep copy
+ record =
+ ZNRECORD_SERIALIZER.deserialize(ZNRECORD_SERIALIZER.serialize(arg._updateValue._znodeValue));
+ break;
+ case INVALID:
+ break;
+ default:
+ break;
+ }
+ }
+ else if (arg._operation.compareTo("-") == 0)
+ {
+ ZnodeModValueType valueType = getValueType(arg._propertyType, arg._key);
+ switch (valueType)
+ {
+ case SINGLE_VALUE:
+ removeSingleValue(record, arg._propertyType, arg._key);
+ break;
+ case LIST_VALUE:
+ removeListValue(record, arg._key);
+ break;
+ case MAP_VALUE:
+ removeMapValue(record, arg._key);
+ break;
+ case ZNODE_VALUE:
+ record = null;
+ break;
+ case INVALID:
+ break;
+ default:
+ break;
+ }
+ }
+ else
+ {
+ logger.warn("fail to execute (unsupport operation): " + arg._operation);
+ success = false;
+ }
+
+ if (success == true)
+ {
+ if (record == null)
+ {
+ zkClient.delete(path);
+ }
+ else
+ {
+ try
+ {
+ zkClient.createPersistent(path, true);
+ }
+ catch (ZkNodeExistsException e)
+ {
+ // OK
+ }
+ zkClient.writeData(path, record, stat.getVersion());
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ catch (ZkBadVersionException e)
+ {
+ // e.printStackTrace();
+ }
+ catch (PropertyStoreException e)
+ {
+ // e.printStackTrace();
+ }
+
+ try
+ {
+ Thread.sleep(backoffTime);
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ backoffTime *= 2;
+ }
+
+ return false;
+ }
+
+ private static class ExecuteCommand implements Runnable
+ {
+ private final TestCommand _command;
+ private final long _startTime;
+ private final ZkClient _zkClient;
+ private final CountDownLatch _countDown;
+ private final Map<TestCommand, Boolean> _testResults;
+
+ public ExecuteCommand(long startTime,
+ TestCommand command,
+ CountDownLatch countDown,
+ ZkClient zkClient,
+ Map<TestCommand, Boolean> testResults)
+ {
+ _startTime = startTime;
+ _command = command;
+ _countDown = countDown;
+ _zkClient = zkClient;
+ _testResults = testResults;
+ }
+
+ @Override
+ public void run()
+ {
+ boolean result = false;
+ long now = System.currentTimeMillis();
+ final long timeout = now + _command._trigger._timeout;
+ ZNRecord diff = new ZNRecord("diff");
+ try
+ {
+ if (now < _startTime)
+ {
+ Thread.sleep(_startTime - now);
+ }
+
+ do
+ {
+ if (_command._commandType == CommandType.MODIFY)
+ {
+ ZnodeOpArg arg = _command._znodeOpArg;
+ final ZnodeValue expectValue = _command._trigger._expectValue;
+ result = compareAndSetZnode(expectValue, arg, _zkClient, diff);
+ // logger.error("result:" + result + ", " + _command);
+
+ if (result == true)
+ {
+ _command._finishTimestamp = System.currentTimeMillis();
+ _testResults.put(_command, true);
+
+ break;
+ }
+ else
+ {
+ // logger.error("result:" + result + ", diff:" + diff);
+ }
+ }
+ else if (_command._commandType == CommandType.VERIFY)
+ {
+ ZnodeOpArg arg = _command._znodeOpArg;
+ final String znodePath = arg._znodePath;
+ ZNRecord record = _zkClient.<ZNRecord> readData(znodePath, true);
+
+ result = executeVerifier(record, _command, diff);
+ // logger.error("result:" + result + ", " + _command.toString());
+ if (result == true)
+ {
+ _command._finishTimestamp = System.currentTimeMillis();
+ _testResults.put(_command, true);
+ break;
+ }
+ else
+ {
+ // logger.error("result:" + result + ", diff:" + diff);
+ }
+ }
+ else if (_command._commandType == CommandType.START)
+ {
+ // TODO add data trigger for START command
+ Thread thread = _command._nodeOpArg._thread;
+ thread.start();
+
+ result = true;
+ _command._finishTimestamp = System.currentTimeMillis();
+ logger.info("result:" + result + ", " + _command.toString());
+ _testResults.put(_command, true);
+ break;
+ }
+ else if (_command._commandType == CommandType.STOP)
+ {
+ // TODO add data trigger for STOP command
+ HelixManager manager = _command._nodeOpArg._manager;
+ manager.disconnect();
+ Thread thread = _command._nodeOpArg._thread;
+ thread.interrupt();
+
+ // System.err.println("stop " +
+ // _command._nodeOpArg._manager.getInstanceName());
+ result = true;
+ _command._finishTimestamp = System.currentTimeMillis();
+ logger.info("result:" + result + ", " + _command.toString());
+ _testResults.put(_command, true);
+ break;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unsupport command type (was "
+ + _command._commandType + ")");
+ }
+
+ Thread.sleep(SLEEP_TIME);
+
+ now = System.currentTimeMillis();
+ }
+ while (now <= timeout);
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ finally
+ {
+ if (result == false)
+ {
+ _command._finishTimestamp = System.currentTimeMillis();
+ logger.error("result:" + result + ", diff: " + diff);
+ }
+ _countDown.countDown();
+ if (_countDown.getCount() == 0)
+ {
+ if (_zkClient != null && _zkClient.getConnection() != null)
+
+ {
+ _zkClient.close();
+ }
+ }
+ }
+ }
+ }
+
+ private static Map<TestCommand, Boolean> executeTestHelper(List<TestCommand> commandList,
+ String zkAddr,
+ CountDownLatch countDown)
+ {
+
+ final Map<TestCommand, Boolean> testResults =
+ new ConcurrentHashMap<TestCommand, Boolean>();
+ ZkClient zkClient = null;
+
+ zkClient = new ZkClient(zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ // sort on trigger's start time, stable sort
+ Collections.sort(commandList, new Comparator<TestCommand>()
+ {
+ @Override
+ public int compare(TestCommand o1, TestCommand o2)
+ {
+ return (int) (o1._trigger._startTime - o2._trigger._startTime);
+ }
+ });
+
+ for (TestCommand command : commandList)
+ {
+ testResults.put(command, new Boolean(false));
+
+ TestTrigger trigger = command._trigger;
+ command._startTimestamp = System.currentTimeMillis() + trigger._startTime;
+ new Thread(new ExecuteCommand(command._startTimestamp,
+ command,
+ countDown,
+ zkClient,
+ testResults)).start();
+ }
+
+ return testResults;
+ }
+
+ public static void executeTestAsync(List<TestCommand> commandList, String zkAddr) throws InterruptedException
+ {
+ CountDownLatch countDown = new CountDownLatch(commandList.size());
+ executeTestHelper(commandList, zkAddr, countDown);
+ }
+
+ public static Map<TestCommand, Boolean> executeTest(List<TestCommand> commandList,
+ String zkAddr) throws InterruptedException
+ {
+ final CountDownLatch countDown = new CountDownLatch(commandList.size());
+ Map<TestCommand, Boolean> testResults =
+ executeTestHelper(commandList, zkAddr, countDown);
+
+ // TODO add timeout
+ countDown.await();
+
+ return testResults;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/TestTrigger.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TestTrigger.java b/helix-core/src/main/java/org/apache/helix/tools/TestTrigger.java
new file mode 100644
index 0000000..31c44ab
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/TestTrigger.java
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+
+
+
+public class TestTrigger
+{
+ public long _startTime;
+ public long _timeout;
+ public ZnodeValue _expectValue;
+
+ /**
+ * no time or data trigger
+ */
+ public TestTrigger()
+ {
+ this(0, 0, (ZnodeValue)null);
+ }
+
+ /**
+ * time trigger with a start time, no data trigger
+ * @param startTime
+ * @param timeout
+ */
+ public TestTrigger(long startTime)
+ {
+ this(startTime, 0, (ZnodeValue)null);
+ }
+
+ /**
+ * simple field data trigger
+ * @param expect
+ */
+ public TestTrigger(long startTime, long timeout, String expect)
+ {
+ this(startTime, timeout, new ZnodeValue(expect));
+ }
+
+ /**
+ * list field data trigger
+ * @param expect
+ */
+ public TestTrigger(long startTime, long timeout, List<String> expect)
+ {
+ this(startTime, timeout, new ZnodeValue(expect));
+ }
+
+ /**
+ * map field data trigger
+ * @param expect
+ */
+ public TestTrigger(long startTime, long timeout, Map<String, String> expect)
+ {
+ this(startTime, timeout, new ZnodeValue(expect));
+ }
+
+ /**
+ * znode data trigger
+ * @param expect
+ */
+ public TestTrigger(long startTime, long timeout, ZNRecord expect)
+ {
+ this(startTime, timeout, new ZnodeValue(expect));
+ }
+
+ /**
+ *
+ * @param startTime
+ * @param timeout
+ * @param expect
+ */
+ public TestTrigger(long startTime, long timeout, ZnodeValue expect)
+ {
+ _startTime = startTime;
+ _timeout = timeout;
+ _expectValue = expect;
+ }
+
+ @Override
+ public String toString()
+ {
+ String ret = "<" + _startTime + "~" + _timeout + "ms, " + _expectValue + ">";
+ return ret;
+ }
+
+ // TODO temp test; remove it
+ /*
+ public static void main(String[] args)
+ {
+ TestTrigger trigger = new TestTrigger(0, 0, "simpleValue0");
+ System.out.println("trigger=" + trigger);
+
+ List<String> list = new ArrayList<String>();
+ list.add("listValue1");
+ list.add("listValue2");
+ trigger = new TestTrigger(0, 0, list);
+ System.out.println("trigger=" + trigger);
+
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("mapKey3", "mapValue3");
+ map.put("mapKey4", "mapValue4");
+ trigger = new TestTrigger(0, 0, map);
+ System.out.println("trigger=" + trigger);
+
+ trigger = new TestTrigger();
+ System.out.println("trigger=" + trigger);
+ }
+ */
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/YAISCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/YAISCalculator.java b/helix-core/src/main/java/org/apache/helix/tools/YAISCalculator.java
new file mode 100644
index 0000000..30b25b6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/YAISCalculator.java
@@ -0,0 +1,202 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+public class YAISCalculator
+{
+ static class Assignment
+ {
+ private final int numNodes;
+ private final int replication;
+ Partition[] partitions;
+ Node[] nodes;
+
+ public Assignment(int numNodes, int numPartitions, int replication)
+ {
+ this.numNodes = numNodes;
+ this.replication = replication;
+ partitions = new Partition[numPartitions];
+ for (int i = 0; i < numPartitions; i++)
+ {
+ partitions[i] = new Partition(i, replication);
+ }
+ nodes = new Node[numNodes];
+ for (int i = 0; i < numNodes; i++)
+ {
+ nodes[i] = new Node(replication);
+ }
+ }
+
+ public void assign(int partitionId, int replicaId, int nodeId)
+ {
+ System.out.println("Assigning (" + partitionId + "," + replicaId
+ + ") to " + nodeId);
+ partitions[partitionId].nodeIds[replicaId] = nodeId;
+ nodes[nodeId].partitionLists.get(replicaId).push(partitionId);
+ }
+
+ public void unassign(int partitionId, int replicaId)
+ {
+
+ }
+
+ Integer[] getPartitionsPerNode(int nodeId, int replicaId)
+ {
+ List<Integer> partitionsList = new ArrayList<Integer>();
+ for (Partition p : partitions)
+ {
+ if (p.nodeIds[replicaId] == nodeId)
+ {
+ partitionsList.add(p.partionId);
+ }
+ }
+ Integer[] array = new Integer[partitionsList.size()];
+ partitionsList.toArray(array);
+ return array;
+ }
+
+ public void printPerNode()
+ {
+ for (int nodeId = 0; nodeId < numNodes; nodeId++)
+ {
+ for (int r = 0; r < replication; r++)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("(").append(nodeId).append(",").append(r).append("):\t");
+ Node node = nodes[nodeId];
+ LinkedList<Integer> linkedList = node.partitionLists.get(r);
+ for (int partitionId : linkedList)
+ {
+ sb.append(partitionId).append(",");
+ }
+ System.out.println(sb.toString());
+ }
+
+ }
+ }
+ }
+
+ static class Partition
+ {
+
+ final int partionId;
+
+ public Partition(int partionId, int replication)
+ {
+ this.partionId = partionId;
+ nodeIds = new int[replication];
+ Arrays.fill(nodeIds, -1);
+ }
+
+ int nodeIds[];
+ }
+
+ static class Node
+ {
+ private final int replication;
+ ArrayList<LinkedList<Integer>> partitionLists;
+
+ public Node(int replication)
+ {
+ this.replication = replication;
+ partitionLists = new ArrayList<LinkedList<Integer>>(replication);
+ for (int i = 0; i < replication; i++)
+ {
+ partitionLists.add(new LinkedList<Integer>());
+ }
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ doAssignment(new int[]
+ { 5 }, 120, 3);
+ }
+
+ private static void doAssignment(int[] nodes, int partitions, int replication)
+ {
+ int N = nodes[0];
+ int totalNodes = 0;
+ for (int temp : nodes)
+ {
+ totalNodes += temp;
+ }
+ Assignment assignment = new Assignment(totalNodes, partitions, replication);
+ int nodeId = 0;
+ for (int i = 0; i < partitions; i++)
+ {
+ assignment.assign(i, 0, nodeId);
+ nodeId = (nodeId + 1) % N;
+ }
+ Random random = new Random();
+ for (int r = 1; r < replication; r++)
+ {
+ for (int id = 0; id < N; id++)
+ {
+ Integer[] partitionsPerNode = assignment.getPartitionsPerNode(id, 0);
+ boolean[] used = new boolean[partitionsPerNode.length];
+ Arrays.fill(used, false);
+ System.out.println(id + "-" + partitionsPerNode.length);
+ nodeId = (id + r) % N;
+ int count = partitionsPerNode.length;
+ boolean done = false;
+ do
+ {
+ if (nodeId != id)
+ {
+ int nextInt = random.nextInt(count);
+ int temp = 0;
+ for (int b = 0; b < used.length; b++)
+ {
+ if (!used[b] && temp == nextInt)
+ {
+ assignment.assign(partitionsPerNode[b], r, nodeId);
+ used[b] = true;
+ break;
+ }
+ }
+ }
+ nodeId = (nodeId + 1) % N;
+ } while (count > 0);
+
+ }
+ }
+ if (nodes.length > 1)
+ {
+ int prevNodeCount = nodes[0];
+ for (int i = 1; i < nodes.length; i++)
+ {
+ int newNodeCount = prevNodeCount + nodes[i];
+ int masterPartitionsToMove = (int) ((partitions * 1.0 / prevNodeCount - partitions
+ * 1.0 / newNodeCount) * 1 * prevNodeCount);
+ while (masterPartitionsToMove > 0)
+ {
+
+ }
+
+ }
+ }
+ assignment.printPerNode();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ZKDumper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZKDumper.java b/helix-core/src/main/java/org/apache/helix/tools/ZKDumper.java
new file mode 100644
index 0000000..279cf76
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZKDumper.java
@@ -0,0 +1,274 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.List;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.helix.manager.zk.ZkClient;
+
+
+/**
+ * Dumps the Zookeeper file structure on to Disk
+ *
+ * @author kgopalak
+ *
+ */
+@SuppressWarnings("static-access")
+public class ZKDumper
+{
+ private ZkClient client;
+ private FilenameFilter filter;
+ static Options options;
+ private String suffix = "";
+ //enable by default
+ private boolean removeSuffix=true;
+
+ public String getSuffix()
+ {
+ return suffix;
+ }
+
+ public void setSuffix(String suffix)
+ {
+ this.suffix = suffix;
+ }
+
+ public boolean isRemoveSuffix()
+ {
+ return removeSuffix;
+ }
+
+ public void setRemoveSuffix(boolean removeSuffix)
+ {
+ this.removeSuffix = removeSuffix;
+ }
+
+ static
+ {
+ options = new Options();
+ OptionGroup optionGroup = new OptionGroup();
+
+ Option d = OptionBuilder.withLongOpt("download")
+ .withDescription("Download from ZK to File System").create();
+ d.setArgs(0);
+ Option dSuffix = OptionBuilder.withLongOpt("addSuffix")
+ .withDescription("add suffix to every file downloaded from ZK").create();
+ dSuffix.setArgs(1);
+ dSuffix.setRequired(false);
+
+ Option u = OptionBuilder.withLongOpt("upload")
+ .withDescription("Upload from File System to ZK").create();
+ u.setArgs(0);
+ Option uSuffix = OptionBuilder.withLongOpt("removeSuffix")
+ .withDescription("remove suffix from every file uploaded to ZK").create();
+ uSuffix.setArgs(0);
+ uSuffix.setRequired(false);
+
+ Option del = OptionBuilder.withLongOpt("delete")
+ .withDescription("Delete given path from ZK").create();
+
+ optionGroup.setRequired(true);
+ optionGroup.addOption(del);
+ optionGroup.addOption(u);
+ optionGroup.addOption(d);
+ options.addOptionGroup(optionGroup);
+ options.addOption("zkSvr", true, "Zookeeper address");
+ options.addOption("zkpath", true, "Zookeeper path");
+ options.addOption("fspath", true, "Path on local Filesystem to dump");
+ options.addOption("h", "help", false, "Print this usage information");
+ options.addOption("v", "verbose", false, "Print out VERBOSE information");
+ options.addOption(dSuffix);
+ options.addOption(uSuffix);
+ }
+
+ public ZKDumper(String zkAddress)
+ {
+ client = new ZkClient(zkAddress, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+ ZkSerializer zkSerializer = new ZkSerializer()
+ {
+
+ @Override
+ public byte[] serialize(Object arg0) throws ZkMarshallingError
+ {
+ return arg0.toString().getBytes();
+ }
+
+ @Override
+ public Object deserialize(byte[] arg0) throws ZkMarshallingError
+ {
+ return new String(arg0);
+ }
+ };
+ client.setZkSerializer(zkSerializer);
+ filter = new FilenameFilter()
+ {
+
+ @Override
+ public boolean accept(File dir, String name)
+ {
+ return !name.startsWith(".");
+ }
+ };
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ if (args == null || args.length == 0)
+ {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.printHelp("java " + ZKDumper.class.getName(), options);
+ System.exit(1);
+ }
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+ cmd.hasOption("zkSvr");
+ boolean download = cmd.hasOption("download");
+ boolean upload = cmd.hasOption("upload");
+ boolean del = cmd.hasOption("delete");
+ String zkAddress = cmd.getOptionValue("zkSvr");
+ String zkPath = cmd.getOptionValue("zkpath");
+ String fsPath = cmd.getOptionValue("fspath");
+
+ ZKDumper zkDump = new ZKDumper(zkAddress);
+ if (download)
+ {
+ if (cmd.hasOption("addSuffix"))
+ {
+ zkDump.suffix = cmd.getOptionValue("addSuffix");
+ }
+ zkDump.download(zkPath, fsPath + zkPath);
+ }
+ if (upload)
+ {
+ if (cmd.hasOption("removeSuffix"))
+ {
+ zkDump.removeSuffix = true;
+ }
+ zkDump.upload(zkPath, fsPath);
+ }
+ if (del)
+ {
+ zkDump.delete(zkPath);
+ }
+ }
+
+ private void delete(String zkPath)
+ {
+ client.deleteRecursive(zkPath);
+
+ }
+
+ public void upload(String zkPath, String fsPath) throws Exception
+ {
+ File file = new File(fsPath);
+ System.out
+ .println("Uploading " + file.getCanonicalPath() + " to " + zkPath);
+ zkPath = zkPath.replaceAll("[/]+", "/");
+ int index = -1;
+ if (removeSuffix && (index = file.getName().indexOf(".")) > -1)
+ {
+ zkPath = zkPath.replaceAll(file.getName().substring(index), "");
+ }
+ if (file.isDirectory())
+ {
+ File[] children = file.listFiles(filter);
+ client.createPersistent(zkPath, true);
+ if (children != null && children.length > 0)
+ {
+
+ for (File child : children)
+ {
+ upload(zkPath + "/" + child.getName(), fsPath + "/" + child.getName());
+ }
+ } else
+ {
+
+ }
+ } else
+ {
+ BufferedReader bfr = null;
+ try
+ {
+ bfr = new BufferedReader(new FileReader(file));
+ StringBuilder sb = new StringBuilder();
+ String line;
+ String recordDelimiter = "";
+ while ((line = bfr.readLine()) != null)
+ {
+ sb.append(recordDelimiter).append(line);
+ recordDelimiter = "\n";
+ }
+ client.createPersistent(zkPath, sb.toString());
+ } catch (Exception e)
+ {
+ throw e;
+ } finally
+ {
+ if (bfr != null)
+ {
+ try
+ {
+ bfr.close();
+ } catch (IOException e)
+ {
+ }
+ }
+ }
+ }
+ }
+
+ public void download(String zkPath, String fsPath) throws Exception
+ {
+
+ List<String> children = client.getChildren(zkPath);
+ if (children != null && children.size() > 0)
+ {
+ new File(fsPath).mkdirs();
+ for (String child : children)
+ {
+ String childPath = zkPath.equals("/")? "/" + child : zkPath + "/" + child;
+ download(childPath, fsPath + "/" + child);
+ }
+ } else
+ {
+ System.out.println("Saving " + zkPath + " to "
+ + new File(fsPath + suffix).getCanonicalPath());
+ FileWriter fileWriter = new FileWriter(fsPath + suffix);
+ Object readData = client.readData(zkPath);
+ if (readData != null)
+ {
+ fileWriter.write((String) readData);
+ }
+ fileWriter.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ZKLogFormatter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZKLogFormatter.java b/helix-core/src/main/java/org/apache/helix/tools/ZKLogFormatter.java
new file mode 100644
index 0000000..ec5e0d0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZKLogFormatter.java
@@ -0,0 +1,395 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import javax.xml.bind.annotation.adapters.HexBinaryAdapter;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.Record;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.persistence.FileHeader;
+import org.apache.zookeeper.server.persistence.FileSnap;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.TxnHeader;
+
+public class ZKLogFormatter
+{
+ private static final Logger LOG = Logger.getLogger(ZKLogFormatter.class);
+ private static DateFormat dateTimeInstance = DateFormat.getDateTimeInstance(
+ DateFormat.SHORT, DateFormat.LONG);
+ private static HexBinaryAdapter adapter = new HexBinaryAdapter();
+ private static String fieldDelim = ":";
+ private static String fieldSep = " ";
+
+ static BufferedWriter bw = null;
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length != 2 && args.length != 3)
+ {
+ System.err.println("USAGE: LogFormatter <log|snapshot> log_file");
+ System.exit(2);
+ }
+
+ if (args.length == 3)
+ {
+ bw = new BufferedWriter(new FileWriter(new File(args[2])));
+ }
+
+ if (args[0].equals("log"))
+ {
+ readTransactionLog(args[1]);
+ } else if (args[0].equals("snapshot"))
+ {
+ readSnapshotLog(args[1]);
+ }
+
+ if (bw != null)
+ {
+ bw.close();
+ }
+ }
+
+ private static void readSnapshotLog(String snapshotPath) throws Exception
+ {
+ FileInputStream fis = new FileInputStream(snapshotPath);
+ BinaryInputArchive ia = BinaryInputArchive.getArchive(fis);
+ Map<Long, Integer> sessions = new HashMap<Long, Integer>();
+ DataTree dt = new DataTree();
+ FileHeader header = new FileHeader();
+ header.deserialize(ia, "fileheader");
+ if (header.getMagic() != FileSnap.SNAP_MAGIC)
+ {
+ throw new IOException("mismatching magic headers " + header.getMagic()
+ + " != " + FileSnap.SNAP_MAGIC);
+ }
+ SerializeUtils.deserializeSnapshot(dt, ia, sessions);
+
+ if (bw != null)
+ {
+ bw.write(sessions.toString());
+ bw.newLine();
+ } else
+ {
+ System.out.println(sessions);
+ }
+ traverse(dt, 1, "/");
+
+ }
+
+ /*
+ * Level order traversal
+ */
+ private static void traverse(DataTree dt, int startId, String startPath) throws Exception
+ {
+ LinkedList<Pair> queue = new LinkedList<Pair>();
+ queue.add(new Pair(startPath, startId));
+ while (!queue.isEmpty())
+ {
+ Pair pair = queue.removeFirst();
+ String path = pair._path;
+ DataNode head = dt.getNode(path);
+ Stat stat = new Stat();
+ byte[] data = null;
+ try
+ {
+ data = dt.getData(path, stat, null);
+ } catch (NoNodeException e)
+ {
+ e.printStackTrace();
+ }
+ // print the node
+ format(startId, pair, head, data);
+ Set<String> children = head.getChildren();
+ if (children != null)
+ {
+ for (String child : children)
+ {
+ String childPath;
+ if (path.endsWith("/"))
+ {
+ childPath = path + child;
+ } else
+ {
+ childPath = path + "/" + child;
+ }
+ queue.add(new Pair(childPath, startId));
+ }
+ }
+ startId = startId + 1;
+ }
+
+ }
+
+ static class Pair
+ {
+
+ private final String _path;
+ private final int _parentId;
+
+ public Pair(String path, int parentId)
+ {
+ _path = path;
+ _parentId = parentId;
+ }
+
+ }
+
+ private static void format(int id, Pair pair, DataNode head, byte[] data) throws Exception
+ {
+ String dataStr = "";
+ if (data != null)
+ {
+ dataStr = new String(data).replaceAll("[\\s]+", "");
+ }
+ StringBuffer sb = new StringBuffer();
+ //@formatter:off
+ sb.append("id").append(fieldDelim).append(id).append(fieldSep);
+ sb.append("parent").append(fieldDelim).append(pair._parentId).append(fieldSep);
+ sb.append("path").append(fieldDelim).append(pair._path).append(fieldSep);
+ sb.append("session").append(fieldDelim).append("0x" +Long.toHexString(head.stat.getEphemeralOwner())).append(fieldSep);
+ sb.append("czxid").append(fieldDelim).append("0x" +Long.toHexString(head.stat.getCzxid())).append(fieldSep);
+ sb.append("ctime").append(fieldDelim).append(head.stat.getCtime()).append(fieldSep);
+ sb.append("mtime").append(fieldDelim).append(head.stat.getMtime()).append(fieldSep);
+ sb.append("cmzxid").append(fieldDelim).append("0x" +Long.toHexString(head.stat.getMzxid())).append(fieldSep);
+ sb.append("pzxid").append(fieldDelim).append("0x" +Long.toHexString(head.stat.getPzxid())).append(fieldSep);
+ sb.append("aversion").append(fieldDelim).append(head.stat.getAversion()).append(fieldSep);
+ sb.append("cversion").append(fieldDelim).append(head.stat.getCversion()).append(fieldSep);
+ sb.append("version").append(fieldDelim).append(head.stat.getVersion()).append(fieldSep);
+ sb.append("data").append(fieldDelim).append(dataStr).append(fieldSep);
+ //@formatter:on
+
+ if (bw != null)
+ {
+ bw.write(sb.toString());
+ bw.newLine();
+ } else
+ {
+ System.out.println(sb);
+ }
+
+ }
+
+ private static void readTransactionLog(String logfilepath)
+ throws FileNotFoundException, IOException, EOFException
+ {
+ FileInputStream fis = new FileInputStream(logfilepath);
+ BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis);
+ FileHeader fhdr = new FileHeader();
+ fhdr.deserialize(logStream, "fileheader");
+
+ if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC)
+ {
+ System.err.println("Invalid magic number for " + logfilepath);
+ System.exit(2);
+ }
+
+ if (bw != null)
+ {
+ bw.write("ZooKeeper Transactional Log File with dbid "
+ + fhdr.getDbid() + " txnlog format version " + fhdr.getVersion());
+ bw.newLine();
+ } else
+ {
+ System.out.println("ZooKeeper Transactional Log File with dbid "
+ + fhdr.getDbid() + " txnlog format version " + fhdr.getVersion());
+ }
+
+
+ int count = 0;
+ while (true)
+ {
+ long crcValue;
+ byte[] bytes;
+ try
+ {
+ crcValue = logStream.readLong("crcvalue");
+
+ bytes = logStream.readBuffer("txnEntry");
+ } catch (EOFException e)
+ {
+ if (bw != null)
+ {
+ bw.write("EOF reached after " + count + " txns.");
+ bw.newLine();
+ } else
+ {
+ System.out.println("EOF reached after " + count + " txns.");
+ }
+
+ break;
+ }
+ if (bytes.length == 0)
+ {
+ // Since we preallocate, we define EOF to be an
+ // empty transaction
+ if (bw != null)
+ {
+ bw.write("EOF reached after " + count + " txns.");
+ bw.newLine();
+ } else
+ {
+ System.out.println("EOF reached after " + count + " txns.");
+ }
+
+ return;
+ }
+ Checksum crc = new Adler32();
+ crc.update(bytes, 0, bytes.length);
+ if (crcValue != crc.getValue())
+ {
+ throw new IOException("CRC doesn't match " + crcValue + " vs "
+ + crc.getValue());
+ }
+ InputArchive iab = BinaryInputArchive
+ .getArchive(new ByteArrayInputStream(bytes));
+ TxnHeader hdr = new TxnHeader();
+ Record txn = SerializeUtils.deserializeTxn(iab, hdr);
+ if (bw != null)
+ {
+ bw.write(formatTransaction(hdr, txn));
+ bw.newLine();
+ } else
+ {
+ System.out.println(formatTransaction(hdr, txn));
+ }
+
+ if (logStream.readByte("EOR") != 'B')
+ {
+ LOG.error("Last transaction was partial.");
+ throw new EOFException("Last transaction was partial.");
+ }
+ count++;
+ }
+ }
+
+ static String op2String(int op)
+ {
+ switch (op)
+ {
+ case OpCode.notification:
+ return "notification";
+ case OpCode.create:
+ return "create";
+ case OpCode.delete:
+ return "delete";
+ case OpCode.exists:
+ return "exists";
+ case OpCode.getData:
+ return "getDate";
+ case OpCode.setData:
+ return "setData";
+ case OpCode.getACL:
+ return "getACL";
+ case OpCode.setACL:
+ return "setACL";
+ case OpCode.getChildren:
+ return "getChildren";
+ case OpCode.getChildren2:
+ return "getChildren2";
+ case OpCode.ping:
+ return "ping";
+ case OpCode.createSession:
+ return "createSession";
+ case OpCode.closeSession:
+ return "closeSession";
+ case OpCode.error:
+ return "error";
+ default:
+ return "unknown " + op;
+ }
+ }
+
+ private static String formatTransaction(TxnHeader header, Record txn)
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("time").append(fieldDelim).append(header.getTime());
+ sb.append(fieldSep).append("session").append(fieldDelim).append("0x")
+ .append(Long.toHexString(header.getClientId()));
+ sb.append(fieldSep).append("cxid").append(fieldDelim).append("0x")
+ .append(Long.toHexString(header.getCxid()));
+ sb.append(fieldSep).append("zxid").append(fieldDelim).append("0x")
+ .append(Long.toHexString(header.getZxid()));
+ sb.append(fieldSep).append("type").append(fieldDelim)
+ .append(op2String(header.getType()));
+ if (txn != null)
+ {
+ try
+ {
+ byte[] data = null;
+ for (PropertyDescriptor pd : Introspector.getBeanInfo(txn.getClass())
+ .getPropertyDescriptors())
+ {
+ if (pd.getName().equalsIgnoreCase("data"))
+ {
+ data = (byte[]) pd.getReadMethod().invoke(txn);
+ continue;
+ }
+ if (pd.getReadMethod() != null && !"class".equals(pd.getName()))
+ {
+ sb.append(fieldSep)
+ .append(pd.getDisplayName())
+ .append(fieldDelim)
+ .append(
+ pd.getReadMethod().invoke(txn).toString()
+ .replaceAll("[\\s]+", ""));
+ }
+ }
+ if (data != null)
+ {
+ sb.append(fieldSep).append("data").append(fieldDelim)
+ .append(new String(data).replaceAll("[\\s]+", ""));
+ }
+ } catch (Exception e)
+ {
+ LOG.error(
+ "Error while retrieving bean property values for " + txn.getClass(),
+ e);
+ }
+ }
+
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java b/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
new file mode 100644
index 0000000..010ec17
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
@@ -0,0 +1,491 @@
+package org.apache.helix.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.log4j.Logger;
+
+
+public class ZkLogAnalyzer
+{
+ private static Logger LOG = Logger.getLogger(ZkLogAnalyzer.class);
+ private static boolean dump = false; ;
+ final static ZNRecordSerializer _deserializer = new ZNRecordSerializer();
+
+ static class Stats
+ {
+ int msgSentCount = 0;
+ int msgSentCount_O2S = 0; // Offline to Slave
+ int msgSentCount_S2M = 0; // Slave to Master
+ int msgSentCount_M2S = 0; // Master to Slave
+ int msgDeleteCount = 0;
+ int msgModifyCount = 0;
+ int curStateCreateCount = 0;
+ int curStateUpdateCount = 0;
+ int extViewCreateCount = 0;
+ int extViewUpdateCount = 0;
+ }
+
+ static String getAttributeValue(String line, String attribute)
+ {
+ if (line == null)
+ return null;
+ String[] parts = line.split("\\s");
+ if (parts != null && parts.length > 0)
+ {
+ for (int i = 0; i < parts.length; i++)
+ {
+ if (parts[i].startsWith(attribute))
+ {
+ String val = parts[i].substring(attribute.length());
+ return val;
+ }
+ }
+ }
+ return null;
+ }
+
+ static String findLastCSUpdateBetween(List<String> csUpdateLines, long start, long end)
+ {
+ long lastCSUpdateTimestamp = Long.MIN_VALUE;
+ String lastCSUpdateLine = null;
+ for (String line : csUpdateLines)
+ {
+ // ZNRecord record = getZNRecord(line);
+ long timestamp = Long.parseLong(getAttributeValue(line, "time:"));
+ if (timestamp >= start && timestamp <= end && timestamp > lastCSUpdateTimestamp)
+ {
+ lastCSUpdateTimestamp = timestamp;
+ lastCSUpdateLine = line;
+ }
+ }
+ assert (lastCSUpdateLine != null) : "No CS update between " + start + " - " + end;
+ return lastCSUpdateLine;
+ }
+
+ static ZNRecord getZNRecord(String line)
+ {
+ ZNRecord record = null;
+ String value = getAttributeValue(line, "data:");
+ if (value != null)
+ {
+ record = (ZNRecord) _deserializer.deserialize(value.getBytes());
+ // if (record == null)
+ // {
+ // System.out.println(line);
+ // }
+ }
+ return record;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length != 3)
+ {
+ System.err.println("USAGE: ZkLogAnalyzer zkLogDir clusterName testStartTime (yyMMdd_hhmmss_SSS)");
+ System.exit(1);
+ }
+
+ System.out.println("ZkLogAnalyzer called with args: " + Arrays.toString(args));
+ // get create-timestamp of "/" + clusterName
+ // find all zk logs after that create-timestamp and parse them
+ // save parsed log in /tmp/zkLogAnalyzor_zklog.parsed0,1,2...
+
+ String zkLogDir = args[0];
+ String clusterName = args[1];
+ // String zkAddr = args[2];
+ String startTimeStr = args[2];
+ // ZkClient zkClient = new ZkClient(zkAddr);
+ // Stat clusterCreateStat = zkClient.getStat("/" + clusterName);
+ SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS");
+ Date date = formatter.parse(startTimeStr);
+ long startTimeStamp = date.getTime();
+
+ System.out.println(clusterName + " created at " + date);
+ while (zkLogDir.endsWith("/"))
+ {
+ zkLogDir = zkLogDir.substring(0, zkLogDir.length() - 1);
+ }
+ if (!zkLogDir.endsWith("/version-2"))
+ {
+ zkLogDir = zkLogDir + "/version-2";
+ }
+ File dir = new File(zkLogDir);
+ File[] zkLogs = dir.listFiles(new FileFilter()
+ {
+
+ @Override
+ public boolean accept(File file)
+ {
+ return file.isFile() && (file.getName().indexOf("log") != -1);
+ }
+ });
+
+ // lastModified time -> zkLog
+ TreeMap<Long, String> lastZkLogs = new TreeMap<Long, String>();
+ for (File file : zkLogs)
+ {
+ if (file.lastModified() > startTimeStamp)
+ {
+ lastZkLogs.put(file.lastModified(), file.getAbsolutePath());
+ }
+ }
+
+ List<String> parsedZkLogs = new ArrayList<String>();
+ int i = 0;
+ System.out.println("zk logs last modified later than "
+ + new Timestamp(startTimeStamp));
+ for (Long lastModified : lastZkLogs.keySet())
+ {
+ String fileName = lastZkLogs.get(lastModified);
+ System.out.println(new Timestamp(lastModified) + ": "
+ + (fileName.substring(fileName.lastIndexOf('/') + 1)));
+
+ String parsedFileName = "zkLogAnalyzor_zklog.parsed" + i;
+ i++;
+ ZKLogFormatter.main(new String[] { "log", fileName, parsedFileName });
+ parsedZkLogs.add(parsedFileName);
+ }
+
+ // sessionId -> create liveInstance line
+ Map<String, String> sessionMap = new HashMap<String, String>();
+
+ // message send lines in time order
+ // List<String> sendMessageLines = new ArrayList<String>();
+
+ // CS update lines in time order
+ List<String> csUpdateLines = new ArrayList<String>();
+
+ String leaderSession = null;
+
+ System.out.println();
+ Stats stats = new Stats();
+ long lastTestStartTimestamp = Long.MAX_VALUE;
+ long controllerStartTime = 0;
+ for (String parsedZkLog : parsedZkLogs)
+ {
+
+ FileInputStream fis = new FileInputStream(parsedZkLog);
+ BufferedReader br = new BufferedReader(new InputStreamReader(fis));
+
+ String inputLine;
+ while ((inputLine = br.readLine()) != null)
+ {
+ String timestamp = getAttributeValue(inputLine, "time:");
+ if (timestamp == null)
+ {
+ continue;
+ }
+ long timestampVal = Long.parseLong(timestamp);
+ if (timestampVal < startTimeStamp)
+ {
+ continue;
+ }
+
+ if (dump == true)
+ {
+ String printLine = inputLine.replaceAll("data:.*", "");
+ printLine = new Timestamp(timestampVal) + " " + printLine.substring(printLine.indexOf("session:"));
+ System.err.println(printLine);
+ }
+
+ if (inputLine.indexOf("/start_disable") != -1)
+ {
+ dump = true;
+ }
+ if (inputLine.indexOf("/" + clusterName + "/CONFIGS/CLUSTER/verify") != -1)
+ {
+ String type = getAttributeValue(inputLine, "type:");
+ if (type.equals("delete"))
+ {
+ System.out.println(timestamp + ": verify done");
+ System.out.println("lastTestStartTimestamp:" + lastTestStartTimestamp);
+ String lastCSUpdateLine =
+ findLastCSUpdateBetween(csUpdateLines,
+ lastTestStartTimestamp,
+ timestampVal);
+ long lastCSUpdateTimestamp =
+ Long.parseLong(getAttributeValue(lastCSUpdateLine, "time:"));
+ System.out.println("Last CS Update:" + lastCSUpdateTimestamp);
+
+ System.out.println("state transition latency: "
+ + +(lastCSUpdateTimestamp - lastTestStartTimestamp) + "ms");
+
+ System.out.println("state transition latency since controller start: "
+ + +(lastCSUpdateTimestamp - controllerStartTime) + "ms");
+
+ System.out.println("Create MSG\t" + stats.msgSentCount + "\t"
+ + stats.msgSentCount_O2S + "\t" + stats.msgSentCount_S2M + "\t"
+ + stats.msgSentCount_M2S);
+ System.out.println("Modify MSG\t" + stats.msgModifyCount);
+ System.out.println("Delete MSG\t" + stats.msgDeleteCount);
+ System.out.println("Create CS\t" + stats.curStateCreateCount);
+ System.out.println("Update CS\t" + stats.curStateUpdateCount);
+ System.out.println("Create EV\t" + stats.extViewCreateCount);
+ System.out.println("Update EV\t" + stats.extViewUpdateCount);
+
+ System.out.println();
+ stats = new Stats();
+ lastTestStartTimestamp = Long.MAX_VALUE;
+ }
+ }
+ else if (inputLine.indexOf("/" + clusterName + "/LIVEINSTANCES/") != -1)
+ {
+ // cluster startup
+ if (timestampVal < lastTestStartTimestamp)
+ {
+ System.out.println("START cluster. SETTING lastTestStartTimestamp to "
+ + new Timestamp(timestampVal) + "\nline:" + inputLine);
+ lastTestStartTimestamp = timestampVal;
+ }
+
+ ZNRecord record = getZNRecord(inputLine);
+ LiveInstance liveInstance = new LiveInstance(record);
+ String session = getAttributeValue(inputLine, "session:");
+ sessionMap.put(session, inputLine);
+ System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LIVEINSTANCE "
+ + liveInstance.getInstanceName());
+ }
+ else if (inputLine.indexOf("closeSession") != -1)
+ {
+ // kill any instance
+ String session = getAttributeValue(inputLine, "session:");
+ if (sessionMap.containsKey(session))
+ {
+ if (timestampVal < lastTestStartTimestamp)
+ {
+ System.out.println("KILL node. SETTING lastTestStartTimestamp to " + timestampVal
+ + " line:" + inputLine);
+ lastTestStartTimestamp = timestampVal;
+ }
+ String line = sessionMap.get(session);
+ ZNRecord record = getZNRecord(line);
+ LiveInstance liveInstance = new LiveInstance(record);
+
+ System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": close session "
+ + liveInstance.getInstanceName());
+ dump = true;
+ }
+ }
+ else if (inputLine.indexOf("/" + clusterName + "/CONFIGS/PARTICIPANT") != -1)
+ {
+ // disable a partition
+ String type = getAttributeValue(inputLine, "type:");
+ if (type.equals("setData") && inputLine.indexOf("HELIX_DISABLED_PARTITION") != -1)
+ {
+ if (timestampVal < lastTestStartTimestamp)
+ {
+ System.out.println("DISABLE partition. SETTING lastTestStartTimestamp to " + timestampVal
+ + " line:" + inputLine);
+ lastTestStartTimestamp = timestampVal;
+ }
+ }
+ } else if (inputLine.indexOf("/" + clusterName + "/CONTROLLER/LEADER") != -1)
+ {
+ // leaderLine = inputLine;
+ ZNRecord record = getZNRecord(inputLine);
+ LiveInstance liveInstance = new LiveInstance(record);
+ String session = getAttributeValue(inputLine, "session:");
+ leaderSession = session;
+ controllerStartTime = Long.parseLong(getAttributeValue(inputLine, "time:"));
+ sessionMap.put(session, inputLine);
+ System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LEADER "
+ + liveInstance.getInstanceName());
+ }
+ else if (inputLine.indexOf("/" + clusterName + "/") != -1
+ && inputLine.indexOf("/CURRENTSTATES/") != -1)
+ {
+ String type = getAttributeValue(inputLine, "type:");
+ if (type.equals("create"))
+ {
+ stats.curStateCreateCount++;
+ }
+ else if (type.equals("setData"))
+ {
+ String path = getAttributeValue(inputLine, "path:");
+ csUpdateLines.add(inputLine);
+ stats.curStateUpdateCount++;
+ // getAttributeValue(line, "data");
+ System.out.println("Update currentstate:"
+ + new Timestamp(Long.parseLong(timestamp)) + ":" + timestamp + " path:"
+ + path);
+ }
+ }
+ else if (inputLine.indexOf("/" + clusterName + "/EXTERNALVIEW/") != -1)
+ {
+ String session = getAttributeValue(inputLine, "session:");
+ if (session.equals(leaderSession))
+ {
+ String type = getAttributeValue(inputLine, "type:");
+ if (type.equals("create"))
+ {
+ stats.extViewCreateCount++;
+ }
+ else if (type.equals("setData"))
+ {
+ stats.extViewUpdateCount++;
+ }
+ }
+
+ // pos = inputLine.indexOf("EXTERNALVIEW");
+ // pos = inputLine.indexOf("data:{", pos);
+ // if (pos != -1)
+ // {
+ // String timestamp = getAttributeValue(inputLine, "time:");
+ // ZNRecord record =
+ // (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+ // .getBytes());
+ // ExternalView extView = new ExternalView(record);
+ // int masterCnt = ClusterStateVerifier.countStateNbInExtView(extView,
+ // "MASTER");
+ // int slaveCnt = ClusterStateVerifier.countStateNbInExtView(extView, "SLAVE");
+ // if (masterCnt == 1200)
+ // {
+ // System.out.println(timestamp + ": externalView " + extView.getResourceName()
+ // + " has " + masterCnt + " MASTER, " + slaveCnt + " SLAVE");
+ // }
+ // }
+ }
+ else if (inputLine.indexOf("/" + clusterName + "/") != -1
+ && inputLine.indexOf("/MESSAGES/") != -1)
+ {
+ String type = getAttributeValue(inputLine, "type:");
+
+ if (type.equals("create"))
+ {
+ ZNRecord record = getZNRecord(inputLine);
+ Message msg = new Message(record);
+ String sendSession = getAttributeValue(inputLine, "session:");
+ if (sendSession.equals(leaderSession)
+ && msg.getMsgType().equals("STATE_TRANSITION")
+ && msg.getMsgState() == MessageState.NEW)
+ {
+ // sendMessageLines.add(inputLine);
+ stats.msgSentCount++;
+
+ if (msg.getFromState().equals("OFFLINE")
+ && msg.getToState().equals("SLAVE"))
+ {
+ stats.msgSentCount_O2S++;
+ }
+ else if (msg.getFromState().equals("SLAVE")
+ && msg.getToState().equals("MASTER"))
+ {
+ stats.msgSentCount_S2M++;
+ }
+ else if (msg.getFromState().equals("MASTER")
+ && msg.getToState().equals("SLAVE"))
+ {
+ stats.msgSentCount_M2S++;
+ }
+ // System.out.println("Message create:"+new
+ // Timestamp(Long.parseLong(timestamp)));
+ }
+
+ // pos = inputLine.indexOf("MESSAGES");
+ // pos = inputLine.indexOf("data:{", pos);
+ // if (pos != -1)
+ // {
+ //
+ // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
+ // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
+ // Message msg = new Message(record);
+ // MessageState msgState = msg.getMsgState();
+ // String msgType = msg.getMsgType();
+ // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.NEW)
+ // {
+ // if (!msgs.containsKey(msg.getMsgId()))
+ // {
+ // msgs.put(msg.getMsgId(), new MsgItem(Long.parseLong(timestamp), msg));
+ // }
+ // else
+ // {
+ // LOG.error("msg: " + msg.getMsgId() + " already sent");
+ // }
+ //
+ // System.out.println(timestamp + ": sendMsg " + msg.getPartitionName() + "("
+ // + msg.getFromState() + "->" + msg.getToState() + ") to "
+ // + msg.getTgtName() + ", size: " + msgBytes.length);
+ // }
+ // }
+ }
+ else if (type.equals("setData"))
+ {
+ stats.msgModifyCount++;
+ // pos = inputLine.indexOf("MESSAGES");
+ // pos = inputLine.indexOf("data:{", pos);
+ // if (pos != -1)
+ // {
+ //
+ // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
+ // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
+ // Message msg = new Message(record);
+ // MessageState msgState = msg.getMsgState();
+ // String msgType = msg.getMsgType();
+ // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.READ)
+ // {
+ // if (!msgs.containsKey(msg.getMsgId()))
+ // {
+ // LOG.error("msg: " + msg.getMsgId() + " never sent");
+ // }
+ // else
+ // {
+ // MsgItem msgItem = msgs.get(msg.getMsgId());
+ // if (msgItem.readTime == 0)
+ // {
+ // msgItem.readTime = Long.parseLong(timestamp);
+ // msgs.put(msg.getMsgId(), msgItem);
+ // // System.out.println(timestamp + ": readMsg " + msg.getPartitionName()
+ // // + "("
+ // // + msg.getFromState() + "->" + msg.getToState() + ") to "
+ // // + msg.getTgtName() + ", latency: " + (msgItem.readTime -
+ // // msgItem.sendTime));
+ // }
+ // }
+ //
+ // }
+ // }
+ }
+ else if (type.equals("delete"))
+ {
+ stats.msgDeleteCount++;
+ // String msgId = path.substring(path.lastIndexOf('/') + 1);
+ // if (msgs.containsKey(msgId))
+ // {
+ // MsgItem msgItem = msgs.get(msgId);
+ // Message msg = msgItem.msg;
+ // msgItem.deleteTime = Long.parseLong(timestamp);
+ // msgs.put(msgId, msgItem);
+ // msgItem.latency = msgItem.deleteTime - msgItem.sendTime;
+ // System.out.println(timestamp + ": delMsg " + msg.getPartitionName() + "("
+ // + msg.getFromState() + "->" + msg.getToState() + ") to "
+ // + msg.getTgtName() + ", latency: " + msgItem.latency);
+ // }
+ // else
+ // {
+ // // messages other than STATE_TRANSITION message
+ // // LOG.error("msg: " + msgId + " never sent");
+ // }
+ }
+ }
+ } // end of [br.readLine()) != null]
+ }
+ }
+}