You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[27/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/TestExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/TestExecutor.java b/helix-core/src/main/java/com/linkedin/helix/tools/TestExecutor.java
deleted file mode 100644
index 02144ce..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/TestExecutor.java
+++ /dev/null
@@ -1,1029 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-import org.I0Itec.zkclient.exception.ZkBadVersionException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.store.PropertyJsonComparator;
-import com.linkedin.helix.store.PropertyJsonSerializer;
-import com.linkedin.helix.store.PropertyStoreException;
-import com.linkedin.helix.tools.TestCommand.CommandType;
-
-/**
- * a test is structured logically as a list of commands a command has three parts: COMMAND
- * | TRIGGER | ARG'S COMMAND could be: modify, verify, start, stop
- *
- * TRIGGER is optional and consists of start-time, timeout, and expect-value which means
- * the COMMAND is triggered between [start-time, start-time + timeout] and is triggered
- * when the value in concern equals to expect-value
- *
- * ARG's format depends on COMMAND if COMMAND is modify/verify, arg is in form of:
- * <znode-path, property-type (SIMPLE, LIST, or MAP), operation(+, -, ==, !=), key,
- * update-value> in which key is k1 for SIMPLE, k1|index for LIST, and k1|k2 for MAP field
- * if COMMAND is start/stop, arg is a thread handler
- *
- * @author zzhang
- *
- */
-
-public class TestExecutor
-{
- /**
- * SIMPLE: simple field change LIST: list field change MAP: map field change ZNODE:
- * entire znode change
- */
- public enum ZnodePropertyType
- {
- SIMPLE, LIST, MAP, ZNODE
- }
-
- private enum ZnodeModValueType
- {
- INVALID, SINGLE_VALUE, LIST_VALUE, MAP_VALUE, ZNODE_VALUE
- }
-
- private static Logger logger =
- Logger.getLogger(TestExecutor.class);
- private static final long SLEEP_TIME = 500; // in
- // ms
-
- private final static PropertyJsonComparator<String> STRING_COMPARATOR =
- new PropertyJsonComparator<String>(String.class);
- private final static PropertyJsonSerializer<ZNRecord> ZNRECORD_SERIALIZER =
- new PropertyJsonSerializer<ZNRecord>(ZNRecord.class);
-
- private static ZnodeModValueType getValueType(ZnodePropertyType type, String key)
- {
- ZnodeModValueType valueType = ZnodeModValueType.INVALID;
- switch (type)
- {
- case SIMPLE:
- if (key == null)
- {
- logger.warn("invalid key for simple field: key is null");
- }
- else
- {
- String keyParts[] = key.split("/");
- if (keyParts.length != 1)
- {
- logger.warn("invalid key for simple field: " + key
- + ", expect 1 part: key1 (no slash)");
- }
- else
- {
- valueType = ZnodeModValueType.SINGLE_VALUE;
- }
- }
- break;
- case LIST:
- if (key == null)
- {
- logger.warn("invalid key for simple field: key is null");
- }
- else
- {
- String keyParts[] = key.split("/");
- if (keyParts.length < 1 || keyParts.length > 2)
- {
- logger.warn("invalid key for list field: " + key
- + ", expect 1 or 2 parts: key1 or key1/index)");
- }
- else if (keyParts.length == 1)
- {
- valueType = ZnodeModValueType.LIST_VALUE;
- }
- else
- {
- try
- {
- int index = Integer.parseInt(keyParts[1]);
- if (index < 0)
- {
- logger.warn("invalid key for list field: " + key + ", index < 0");
- }
- else
- {
- valueType = ZnodeModValueType.SINGLE_VALUE;
- }
- }
- catch (NumberFormatException e)
- {
- logger.warn("invalid key for list field: " + key
- + ", part-2 is NOT an integer");
- }
- }
- }
- break;
- case MAP:
- if (key == null)
- {
- logger.warn("invalid key for simple field: key is null");
- }
- else
- {
- String keyParts[] = key.split("/");
- if (keyParts.length < 1 || keyParts.length > 2)
- {
- logger.warn("invalid key for map field: " + key
- + ", expect 1 or 2 parts: key1 or key1/key2)");
- }
- else if (keyParts.length == 1)
- {
- valueType = ZnodeModValueType.MAP_VALUE;
- }
- else
- {
- valueType = ZnodeModValueType.SINGLE_VALUE;
- }
- }
- break;
- case ZNODE:
- valueType = ZnodeModValueType.ZNODE_VALUE;
- default:
- break;
- }
- return valueType;
- }
-
- private static String getSingleValue(ZNRecord record, ZnodePropertyType type, String key)
- {
- if (record == null || key == null)
- {
- return null;
- }
-
- String value = null;
- String keyParts[] = key.split("/");
-
- switch (type)
- {
- case SIMPLE:
- value = record.getSimpleField(key);
- break;
- case LIST:
- List<String> list = record.getListField(keyParts[0]);
- if (list == null)
- {
- logger.warn("invalid key for list field: " + key
- + ", map for key part-1 doesn't exist");
- return null;
- }
- int idx = Integer.parseInt(keyParts[1]);
- value = list.get(idx);
- break;
- case MAP:
- Map<String, String> map = record.getMapField(keyParts[0]);
- if (map == null)
- {
- logger.warn("invalid key for map field: " + key
- + ", map for key part-1 doesn't exist");
- return null;
- }
- value = map.get(keyParts[1]);
- break;
- default:
- break;
- }
-
- return value;
- }
-
- private static List<String> getListValue(ZNRecord record, String key)
- {
- if (record == null)
- {
- return null;
- }
- return record.getListField(key);
- }
-
- private static Map<String, String> getMapValue(ZNRecord record, String key)
- {
- return record.getMapField(key);
- }
-
- // comparator's for single/list/map-value
- private static boolean compareSingleValue(String actual,
- String expect,
- String key,
- ZNRecord diff)
- {
- boolean ret = (STRING_COMPARATOR.compare(actual, expect) == 0);
-
- if (diff != null)
- {
- diff.setSimpleField(key + "/expect", expect);
- diff.setSimpleField(key + "/actual", actual);
- }
- return ret;
- }
-
- private static boolean compareListValue(List<String> actualList,
- List<String> expectList,
- String key,
- ZNRecord diff)
- {
- boolean ret = true;
- if (actualList == null && expectList == null)
- {
- ret = true;
- }
- else if (actualList == null && expectList != null)
- {
- ret = false;
- if (diff != null)
- {
- diff.setListField(key + "/expect", expectList);
- }
- }
- else if (actualList != null && expectList == null)
- {
- ret = false;
- if (diff != null)
- {
- diff.setListField(key + "/actual", actualList);
- }
- }
- else
- {
- Iterator<String> itrActual = actualList.iterator();
- Iterator<String> itrExpect = expectList.iterator();
- if (diff != null && diff.getListField(key + "/expect") == null)
- {
- diff.setListField(key + "/expect", new ArrayList<String>());
- }
-
- if (diff != null && diff.getListField(key + "/actual") == null)
- {
- diff.setListField(key + "/actual", new ArrayList<String>());
- }
-
- while (itrActual.hasNext() && itrExpect.hasNext())
- {
- String actual = itrActual.next();
- String expect = itrExpect.next();
-
- if (STRING_COMPARATOR.compare(actual, expect) != 0)
- {
- ret = false;
- if (diff != null)
- {
- diff.getListField(key + "/expect").add(expect);
- diff.getListField(key + "/actual").add(actual);
- }
- }
- }
-
- while (itrActual.hasNext())
- {
- String actual = itrActual.next();
- if (diff != null)
- {
- diff.getListField(key + "/actual").add(actual);
- }
- }
-
- while (itrExpect.hasNext())
- {
- String expect = itrExpect.next();
- if (diff != null)
- {
- diff.getListField(key + "/expect").add(expect);
- }
- }
- }
- return ret;
- }
-
- private static void setMapField(ZNRecord record, String key1, String key2, String value)
- {
- if (record.getMapField(key1) == null)
- {
- record.setMapField(key1, new TreeMap<String, String>());
- }
- record.getMapField(key1).put(key2, value);
- }
-
- private static boolean compareMapValue(Map<String, String> actualMap,
- Map<String, String> expectMap,
- String mapKey,
- ZNRecord diff)
- {
- boolean ret = true;
- if (actualMap == null && expectMap == null)
- {
- ret = true;
- }
- else if (actualMap == null && expectMap != null)
- {
- ret = false;
- if (diff != null)
- {
- diff.setMapField(mapKey + "/expect", expectMap);
- }
- }
- else if (actualMap != null && expectMap == null)
- {
- ret = false;
- if (diff != null)
- {
- diff.setMapField(mapKey + "/actual", actualMap);
- }
-
- }
- else
- {
- for (String key : actualMap.keySet())
- {
- String actual = actualMap.get(key);
- if (!expectMap.containsKey(key))
- {
- ret = false;
-
- if (diff != null)
- {
- setMapField(diff, mapKey + "/actual", key, actual);
- }
- }
- else
- {
- String expect = expectMap.get(key);
- if (STRING_COMPARATOR.compare(actual, expect) != 0)
- {
- ret = false;
- if (diff != null)
- {
- setMapField(diff, mapKey + "/actual", key, actual);
- setMapField(diff, mapKey + "/expect", key, expect);
- }
- }
- }
- }
-
- for (String key : expectMap.keySet())
- {
- String expect = expectMap.get(key);
- if (!actualMap.containsKey(key))
- {
- ret = false;
-
- if (diff != null)
- {
- setMapField(diff, mapKey + "/expect", key, expect);
- }
- }
- else
- {
- String actual = actualMap.get(key);
- if (STRING_COMPARATOR.compare(actual, expect) != 0)
- {
- ret = false;
- if (diff != null)
- {
- setMapField(diff, mapKey + "/actual", key, actual);
- setMapField(diff, mapKey + "/expect", key, expect);
- }
- }
- }
- }
- }
- return ret;
- }
-
- private static void setZNRecord(ZNRecord diff, ZNRecord record, String keySuffix)
- {
- if (diff == null || record == null)
- {
- return;
- }
-
- for (String key : record.getSimpleFields().keySet())
- {
- diff.setSimpleField(key + "/" + keySuffix, record.getSimpleField(key));
- }
-
- for (String key : record.getListFields().keySet())
- {
- diff.setListField(key + "/" + keySuffix, record.getListField(key));
- }
-
- for (String key : record.getMapFields().keySet())
- {
- diff.setMapField(key + "/" + keySuffix, record.getMapField(key));
- }
- }
-
- private static boolean compareZnodeValue(ZNRecord actual, ZNRecord expect, ZNRecord diff)
- {
- boolean ret = true;
- if (actual == null && expect == null)
- {
- ret = true;
- }
- else if (actual == null && expect != null)
- {
- ret = false;
- if (diff != null)
- {
- setZNRecord(diff, expect, "expect");
- }
- }
- else if (actual != null && expect == null)
- {
- ret = false;
- if (diff != null)
- {
- setZNRecord(diff, actual, "actual");
- }
- }
- else
- {
- for (String key : actual.getSimpleFields().keySet())
- {
- if (compareSingleValue(actual.getSimpleField(key),
- expect.getSimpleField(key),
- key,
- diff) == false)
- {
- ret = false;
- }
- }
-
- for (String key : expect.getMapFields().keySet())
- {
- if (!actual.getMapFields().containsKey(key))
- {
- if (diff != null)
- {
- ret = false;
- diff.setMapField(key + "/expect", expect.getMapField(key));
- }
- }
- else
- {
- if (compareMapValue(actual.getMapField(key), expect.getMapField(key), key, diff) == false)
- {
- ret = false;
- }
- }
- }
-
- for (String key : actual.getMapFields().keySet())
- {
- if (!expect.getMapFields().containsKey(key))
- {
- if (diff != null)
- {
- ret = false;
- diff.setMapField(key + "/actual", actual.getMapField(key));
- }
- }
- else
- {
- if (compareMapValue(actual.getMapField(key), expect.getMapField(key), key, diff) == false)
- {
- ret = false;
- }
- }
- }
- }
- return ret;
- }
-
- private static void resetZNRecord(ZNRecord record)
- {
- if (record != null)
- {
- record.getSimpleFields().clear();
- record.getListFields().clear();
- record.getMapFields().clear();
- }
- }
-
- private static boolean isValueExpected(ZNRecord current,
- ZnodePropertyType type,
- String key,
- ZnodeValue expect,
- ZNRecord diff)
- {
- // expect value = null means not expect any value
- if (expect == null)
- {
- return true;
- }
-
- boolean result = false;
- resetZNRecord(diff);
- ZnodeModValueType valueType = getValueType(type, key);
- switch (valueType)
- {
- case SINGLE_VALUE:
- String singleValue = getSingleValue(current, type, key);
- result = compareSingleValue(singleValue, expect._singleValue, key, diff);
- break;
- case LIST_VALUE:
- List<String> listValue = getListValue(current, key);
- result = compareListValue(listValue, expect._listValue, key, diff);
- break;
- case MAP_VALUE:
- Map<String, String> mapValue = getMapValue(current, key);
- result = compareMapValue(mapValue, expect._mapValue, key, diff);
- break;
- case ZNODE_VALUE:
- result = compareZnodeValue(current, expect._znodeValue, diff);
- break;
- case INVALID:
- break;
- default:
- break;
- }
- return result;
- }
-
- private static void setSingleValue(ZNRecord record,
- ZnodePropertyType type,
- String key,
- String value)
- {
- String keyParts[] = key.split("/");
-
- switch (type)
- {
- case SIMPLE:
- record.setSimpleField(key, value);
- break;
- case LIST:
- List<String> list = record.getListField(keyParts[0]);
- if (list == null)
- {
- logger.warn("invalid key for list field: " + key
- + ", value for key part-1 doesn't exist");
- return;
- }
- int idx = Integer.parseInt(keyParts[1]);
- list.remove(idx);
- list.add(idx, value);
- break;
- case MAP:
- Map<String, String> map = record.getMapField(keyParts[0]);
- if (map == null)
- {
- logger.warn("invalid key for map field: " + key
- + ", value for key part-1 doesn't exist");
- return;
- }
- map.put(keyParts[1], value);
- break;
- default:
- break;
- }
- }
-
- private static void setListValue(ZNRecord record, String key, List<String> value)
- {
- record.setListField(key, value);
- }
-
- private static void setMapValue(ZNRecord record, String key, Map<String, String> value)
- {
- record.setMapField(key, value);
- }
-
- private static void removeSingleValue(ZNRecord record,
- ZnodePropertyType type,
- String key)
- {
- if (record == null)
- {
- return;
- }
-
- String keyParts[] = key.split("/");
- switch (type)
- {
- case SIMPLE:
- record.getSimpleFields().remove(key);
- break;
- case LIST:
- List<String> list = record.getListField(keyParts[0]);
- if (list == null)
- {
- logger.warn("invalid key for list field: " + key
- + ", value for key part-1 doesn't exist");
- return;
- }
- int idx = Integer.parseInt(keyParts[1]);
- list.remove(idx);
- break;
- case MAP:
- Map<String, String> map = record.getMapField(keyParts[0]);
- if (map == null)
- {
- logger.warn("invalid key for map field: " + key
- + ", value for key part-1 doesn't exist");
- return;
- }
- map.remove(keyParts[1]);
- break;
- default:
- break;
- }
- }
-
- private static void removeListValue(ZNRecord record, String key)
- {
- if (record == null || record.getListFields() == null)
- {
- record.getListFields().remove(key);
- }
- }
-
- private static void removeMapValue(ZNRecord record, String key)
- {
- record.getMapFields().remove(key);
- }
-
- private static boolean executeVerifier(ZNRecord actual,
- TestCommand command,
- ZNRecord diff)
- {
- final ZnodeOpArg arg = command._znodeOpArg;
- final ZnodeValue expectValue = command._trigger._expectValue;
-
- boolean result =
- isValueExpected(actual, arg._propertyType, arg._key, expectValue, diff);
- String operation = arg._operation;
- if (operation.equals("!="))
- {
- result = !result;
- }
- else if (!operation.equals("=="))
- {
- logger.warn("fail to execute (unsupport operation=" + operation + "):" + operation);
- result = false;
- }
-
- return result;
- }
-
- private static boolean compareAndSetZnode(ZnodeValue expect,
- ZnodeOpArg arg,
- ZkClient zkClient,
- ZNRecord diff)
- {
- String path = arg._znodePath;
- ZnodePropertyType type = arg._propertyType;
- String key = arg._key;
- boolean success = true;
-
- // retry 3 times in case there are write conflicts
- long backoffTime = 20; // ms
- for (int i = 0; i < 3; i++)
- {
- try
- {
- Stat stat = new Stat();
- ZNRecord record = zkClient.<ZNRecord> readDataAndStat(path, stat, true);
-
- if (isValueExpected(record, type, key, expect, diff))
- {
- if (arg._operation.compareTo("+") == 0)
- {
- if (record == null)
- {
- record = new ZNRecord("default");
- }
- ZnodeModValueType valueType = getValueType(arg._propertyType, arg._key);
- switch (valueType)
- {
- case SINGLE_VALUE:
- setSingleValue(record,
- arg._propertyType,
- arg._key,
- arg._updateValue._singleValue);
- break;
- case LIST_VALUE:
- setListValue(record, arg._key, arg._updateValue._listValue);
- break;
- case MAP_VALUE:
- setMapValue(record, arg._key, arg._updateValue._mapValue);
- break;
- case ZNODE_VALUE:
- // deep copy
- record =
- ZNRECORD_SERIALIZER.deserialize(ZNRECORD_SERIALIZER.serialize(arg._updateValue._znodeValue));
- break;
- case INVALID:
- break;
- default:
- break;
- }
- }
- else if (arg._operation.compareTo("-") == 0)
- {
- ZnodeModValueType valueType = getValueType(arg._propertyType, arg._key);
- switch (valueType)
- {
- case SINGLE_VALUE:
- removeSingleValue(record, arg._propertyType, arg._key);
- break;
- case LIST_VALUE:
- removeListValue(record, arg._key);
- break;
- case MAP_VALUE:
- removeMapValue(record, arg._key);
- break;
- case ZNODE_VALUE:
- record = null;
- break;
- case INVALID:
- break;
- default:
- break;
- }
- }
- else
- {
- logger.warn("fail to execute (unsupport operation): " + arg._operation);
- success = false;
- }
-
- if (success == true)
- {
- if (record == null)
- {
- zkClient.delete(path);
- }
- else
- {
- try
- {
- zkClient.createPersistent(path, true);
- }
- catch (ZkNodeExistsException e)
- {
- // OK
- }
- zkClient.writeData(path, record, stat.getVersion());
- }
- return true;
- }
- else
- {
- return false;
- }
- }
- }
- catch (ZkBadVersionException e)
- {
- // e.printStackTrace();
- }
- catch (PropertyStoreException e)
- {
- // e.printStackTrace();
- }
-
- try
- {
- Thread.sleep(backoffTime);
- }
- catch (InterruptedException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- backoffTime *= 2;
- }
-
- return false;
- }
-
- private static class ExecuteCommand implements Runnable
- {
- private final TestCommand _command;
- private final long _startTime;
- private final ZkClient _zkClient;
- private final CountDownLatch _countDown;
- private final Map<TestCommand, Boolean> _testResults;
-
- public ExecuteCommand(long startTime,
- TestCommand command,
- CountDownLatch countDown,
- ZkClient zkClient,
- Map<TestCommand, Boolean> testResults)
- {
- _startTime = startTime;
- _command = command;
- _countDown = countDown;
- _zkClient = zkClient;
- _testResults = testResults;
- }
-
- @Override
- public void run()
- {
- boolean result = false;
- long now = System.currentTimeMillis();
- final long timeout = now + _command._trigger._timeout;
- ZNRecord diff = new ZNRecord("diff");
- try
- {
- if (now < _startTime)
- {
- Thread.sleep(_startTime - now);
- }
-
- do
- {
- if (_command._commandType == CommandType.MODIFY)
- {
- ZnodeOpArg arg = _command._znodeOpArg;
- final ZnodeValue expectValue = _command._trigger._expectValue;
- result = compareAndSetZnode(expectValue, arg, _zkClient, diff);
- // logger.error("result:" + result + ", " + _command);
-
- if (result == true)
- {
- _command._finishTimestamp = System.currentTimeMillis();
- _testResults.put(_command, true);
-
- break;
- }
- else
- {
- // logger.error("result:" + result + ", diff:" + diff);
- }
- }
- else if (_command._commandType == CommandType.VERIFY)
- {
- ZnodeOpArg arg = _command._znodeOpArg;
- final String znodePath = arg._znodePath;
- ZNRecord record = _zkClient.<ZNRecord> readData(znodePath, true);
-
- result = executeVerifier(record, _command, diff);
- // logger.error("result:" + result + ", " + _command.toString());
- if (result == true)
- {
- _command._finishTimestamp = System.currentTimeMillis();
- _testResults.put(_command, true);
- break;
- }
- else
- {
- // logger.error("result:" + result + ", diff:" + diff);
- }
- }
- else if (_command._commandType == CommandType.START)
- {
- // TODO add data trigger for START command
- Thread thread = _command._nodeOpArg._thread;
- thread.start();
-
- result = true;
- _command._finishTimestamp = System.currentTimeMillis();
- logger.info("result:" + result + ", " + _command.toString());
- _testResults.put(_command, true);
- break;
- }
- else if (_command._commandType == CommandType.STOP)
- {
- // TODO add data trigger for STOP command
- HelixManager manager = _command._nodeOpArg._manager;
- manager.disconnect();
- Thread thread = _command._nodeOpArg._thread;
- thread.interrupt();
-
- // System.err.println("stop " +
- // _command._nodeOpArg._manager.getInstanceName());
- result = true;
- _command._finishTimestamp = System.currentTimeMillis();
- logger.info("result:" + result + ", " + _command.toString());
- _testResults.put(_command, true);
- break;
- }
- else
- {
- throw new IllegalArgumentException("Unsupport command type (was "
- + _command._commandType + ")");
- }
-
- Thread.sleep(SLEEP_TIME);
-
- now = System.currentTimeMillis();
- }
- while (now <= timeout);
- }
- catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- finally
- {
- if (result == false)
- {
- _command._finishTimestamp = System.currentTimeMillis();
- logger.error("result:" + result + ", diff: " + diff);
- }
- _countDown.countDown();
- if (_countDown.getCount() == 0)
- {
- if (_zkClient != null && _zkClient.getConnection() != null)
-
- {
- _zkClient.close();
- }
- }
- }
- }
- }
-
- private static Map<TestCommand, Boolean> executeTestHelper(List<TestCommand> commandList,
- String zkAddr,
- CountDownLatch countDown)
- {
-
- final Map<TestCommand, Boolean> testResults =
- new ConcurrentHashMap<TestCommand, Boolean>();
- ZkClient zkClient = null;
-
- zkClient = new ZkClient(zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
- zkClient.setZkSerializer(new ZNRecordSerializer());
-
- // sort on trigger's start time, stable sort
- Collections.sort(commandList, new Comparator<TestCommand>()
- {
- @Override
- public int compare(TestCommand o1, TestCommand o2)
- {
- return (int) (o1._trigger._startTime - o2._trigger._startTime);
- }
- });
-
- for (TestCommand command : commandList)
- {
- testResults.put(command, new Boolean(false));
-
- TestTrigger trigger = command._trigger;
- command._startTimestamp = System.currentTimeMillis() + trigger._startTime;
- new Thread(new ExecuteCommand(command._startTimestamp,
- command,
- countDown,
- zkClient,
- testResults)).start();
- }
-
- return testResults;
- }
-
- public static void executeTestAsync(List<TestCommand> commandList, String zkAddr) throws InterruptedException
- {
- CountDownLatch countDown = new CountDownLatch(commandList.size());
- executeTestHelper(commandList, zkAddr, countDown);
- }
-
- public static Map<TestCommand, Boolean> executeTest(List<TestCommand> commandList,
- String zkAddr) throws InterruptedException
- {
- final CountDownLatch countDown = new CountDownLatch(commandList.size());
- Map<TestCommand, Boolean> testResults =
- executeTestHelper(commandList, zkAddr, countDown);
-
- // TODO add timeout
- countDown.await();
-
- return testResults;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/TestTrigger.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/TestTrigger.java b/helix-core/src/main/java/com/linkedin/helix/tools/TestTrigger.java
deleted file mode 100644
index 19d1b03..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/TestTrigger.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.util.List;
-import java.util.Map;
-
-import com.linkedin.helix.ZNRecord;
-
-
-public class TestTrigger
-{
- public long _startTime;
- public long _timeout;
- public ZnodeValue _expectValue;
-
- /**
- * no time or data trigger
- */
- public TestTrigger()
- {
- this(0, 0, (ZnodeValue)null);
- }
-
- /**
- * time trigger with a start time, no data trigger
- * @param startTime
- * @param timeout
- */
- public TestTrigger(long startTime)
- {
- this(startTime, 0, (ZnodeValue)null);
- }
-
- /**
- * simple field data trigger
- * @param expect
- */
- public TestTrigger(long startTime, long timeout, String expect)
- {
- this(startTime, timeout, new ZnodeValue(expect));
- }
-
- /**
- * list field data trigger
- * @param expect
- */
- public TestTrigger(long startTime, long timeout, List<String> expect)
- {
- this(startTime, timeout, new ZnodeValue(expect));
- }
-
- /**
- * map field data trigger
- * @param expect
- */
- public TestTrigger(long startTime, long timeout, Map<String, String> expect)
- {
- this(startTime, timeout, new ZnodeValue(expect));
- }
-
- /**
- * znode data trigger
- * @param expect
- */
- public TestTrigger(long startTime, long timeout, ZNRecord expect)
- {
- this(startTime, timeout, new ZnodeValue(expect));
- }
-
- /**
- *
- * @param startTime
- * @param timeout
- * @param expect
- */
- public TestTrigger(long startTime, long timeout, ZnodeValue expect)
- {
- _startTime = startTime;
- _timeout = timeout;
- _expectValue = expect;
- }
-
- @Override
- public String toString()
- {
- String ret = "<" + _startTime + "~" + _timeout + "ms, " + _expectValue + ">";
- return ret;
- }
-
- // TODO temp test; remove it
- /*
- public static void main(String[] args)
- {
- TestTrigger trigger = new TestTrigger(0, 0, "simpleValue0");
- System.out.println("trigger=" + trigger);
-
- List<String> list = new ArrayList<String>();
- list.add("listValue1");
- list.add("listValue2");
- trigger = new TestTrigger(0, 0, list);
- System.out.println("trigger=" + trigger);
-
- Map<String, String> map = new HashMap<String, String>();
- map.put("mapKey3", "mapValue3");
- map.put("mapKey4", "mapValue4");
- trigger = new TestTrigger(0, 0, map);
- System.out.println("trigger=" + trigger);
-
- trigger = new TestTrigger();
- System.out.println("trigger=" + trigger);
- }
- */
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/YAISCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/YAISCalculator.java b/helix-core/src/main/java/com/linkedin/helix/tools/YAISCalculator.java
deleted file mode 100644
index 9da7e73..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/YAISCalculator.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-public class YAISCalculator
-{
- static class Assignment
- {
- private final int numNodes;
- private final int replication;
- Partition[] partitions;
- Node[] nodes;
-
- public Assignment(int numNodes, int numPartitions, int replication)
- {
- this.numNodes = numNodes;
- this.replication = replication;
- partitions = new Partition[numPartitions];
- for (int i = 0; i < numPartitions; i++)
- {
- partitions[i] = new Partition(i, replication);
- }
- nodes = new Node[numNodes];
- for (int i = 0; i < numNodes; i++)
- {
- nodes[i] = new Node(replication);
- }
- }
-
- public void assign(int partitionId, int replicaId, int nodeId)
- {
- System.out.println("Assigning (" + partitionId + "," + replicaId
- + ") to " + nodeId);
- partitions[partitionId].nodeIds[replicaId] = nodeId;
- nodes[nodeId].partitionLists.get(replicaId).push(partitionId);
- }
-
- public void unassign(int partitionId, int replicaId)
- {
-
- }
-
- Integer[] getPartitionsPerNode(int nodeId, int replicaId)
- {
- List<Integer> partitionsList = new ArrayList<Integer>();
- for (Partition p : partitions)
- {
- if (p.nodeIds[replicaId] == nodeId)
- {
- partitionsList.add(p.partionId);
- }
- }
- Integer[] array = new Integer[partitionsList.size()];
- partitionsList.toArray(array);
- return array;
- }
-
- public void printPerNode()
- {
- for (int nodeId = 0; nodeId < numNodes; nodeId++)
- {
- for (int r = 0; r < replication; r++)
- {
- StringBuilder sb = new StringBuilder();
- sb.append("(").append(nodeId).append(",").append(r).append("):\t");
- Node node = nodes[nodeId];
- LinkedList<Integer> linkedList = node.partitionLists.get(r);
- for (int partitionId : linkedList)
- {
- sb.append(partitionId).append(",");
- }
- System.out.println(sb.toString());
- }
-
- }
- }
- }
-
- static class Partition
- {
-
- final int partionId;
-
- public Partition(int partionId, int replication)
- {
- this.partionId = partionId;
- nodeIds = new int[replication];
- Arrays.fill(nodeIds, -1);
- }
-
- int nodeIds[];
- }
-
- static class Node
- {
- private final int replication;
- ArrayList<LinkedList<Integer>> partitionLists;
-
- public Node(int replication)
- {
- this.replication = replication;
- partitionLists = new ArrayList<LinkedList<Integer>>(replication);
- for (int i = 0; i < replication; i++)
- {
- partitionLists.add(new LinkedList<Integer>());
- }
- }
-
- }
-
- public static void main(String[] args)
- {
- doAssignment(new int[]
- { 5 }, 120, 3);
- }
-
- private static void doAssignment(int[] nodes, int partitions, int replication)
- {
- int N = nodes[0];
- int totalNodes = 0;
- for (int temp : nodes)
- {
- totalNodes += temp;
- }
- Assignment assignment = new Assignment(totalNodes, partitions, replication);
- int nodeId = 0;
- for (int i = 0; i < partitions; i++)
- {
- assignment.assign(i, 0, nodeId);
- nodeId = (nodeId + 1) % N;
- }
- Random random = new Random();
- for (int r = 1; r < replication; r++)
- {
- for (int id = 0; id < N; id++)
- {
- Integer[] partitionsPerNode = assignment.getPartitionsPerNode(id, 0);
- boolean[] used = new boolean[partitionsPerNode.length];
- Arrays.fill(used, false);
- System.out.println(id + "-" + partitionsPerNode.length);
- nodeId = (id + r) % N;
- int count = partitionsPerNode.length;
- boolean done = false;
- do
- {
- if (nodeId != id)
- {
- int nextInt = random.nextInt(count);
- int temp = 0;
- for (int b = 0; b < used.length; b++)
- {
- if (!used[b] && temp == nextInt)
- {
- assignment.assign(partitionsPerNode[b], r, nodeId);
- used[b] = true;
- break;
- }
- }
- }
- nodeId = (nodeId + 1) % N;
- } while (count > 0);
-
- }
- }
- if (nodes.length > 1)
- {
- int prevNodeCount = nodes[0];
- for (int i = 1; i < nodes.length; i++)
- {
- int newNodeCount = prevNodeCount + nodes[i];
- int masterPartitionsToMove = (int) ((partitions * 1.0 / prevNodeCount - partitions
- * 1.0 / newNodeCount) * 1 * prevNodeCount);
- while (masterPartitionsToMove > 0)
- {
-
- }
-
- }
- }
- assignment.printPerNode();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/ZKDumper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/ZKDumper.java b/helix-core/src/main/java/com/linkedin/helix/tools/ZKDumper.java
deleted file mode 100644
index 2a8b7af..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/ZKDumper.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.util.List;
-
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.OptionGroup;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-
-import com.linkedin.helix.manager.zk.ZkClient;
-
-/**
- * Dumps the Zookeeper file structure on to Disk
- *
- * @author kgopalak
- *
- */
-@SuppressWarnings("static-access")
-public class ZKDumper
-{
- private ZkClient client;
- private FilenameFilter filter;
- static Options options;
- private String suffix = "";
- //enable by default
- private boolean removeSuffix=true;
-
- public String getSuffix()
- {
- return suffix;
- }
-
- public void setSuffix(String suffix)
- {
- this.suffix = suffix;
- }
-
- public boolean isRemoveSuffix()
- {
- return removeSuffix;
- }
-
- public void setRemoveSuffix(boolean removeSuffix)
- {
- this.removeSuffix = removeSuffix;
- }
-
- static
- {
- options = new Options();
- OptionGroup optionGroup = new OptionGroup();
-
- Option d = OptionBuilder.withLongOpt("download")
- .withDescription("Download from ZK to File System").create();
- d.setArgs(0);
- Option dSuffix = OptionBuilder.withLongOpt("addSuffix")
- .withDescription("add suffix to every file downloaded from ZK").create();
- dSuffix.setArgs(1);
- dSuffix.setRequired(false);
-
- Option u = OptionBuilder.withLongOpt("upload")
- .withDescription("Upload from File System to ZK").create();
- u.setArgs(0);
- Option uSuffix = OptionBuilder.withLongOpt("removeSuffix")
- .withDescription("remove suffix from every file uploaded to ZK").create();
- uSuffix.setArgs(0);
- uSuffix.setRequired(false);
-
- Option del = OptionBuilder.withLongOpt("delete")
- .withDescription("Delete given path from ZK").create();
-
- optionGroup.setRequired(true);
- optionGroup.addOption(del);
- optionGroup.addOption(u);
- optionGroup.addOption(d);
- options.addOptionGroup(optionGroup);
- options.addOption("zkSvr", true, "Zookeeper address");
- options.addOption("zkpath", true, "Zookeeper path");
- options.addOption("fspath", true, "Path on local Filesystem to dump");
- options.addOption("h", "help", false, "Print this usage information");
- options.addOption("v", "verbose", false, "Print out VERBOSE information");
- options.addOption(dSuffix);
- options.addOption(uSuffix);
- }
-
- public ZKDumper(String zkAddress)
- {
- client = new ZkClient(zkAddress, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
- ZkSerializer zkSerializer = new ZkSerializer()
- {
-
- @Override
- public byte[] serialize(Object arg0) throws ZkMarshallingError
- {
- return arg0.toString().getBytes();
- }
-
- @Override
- public Object deserialize(byte[] arg0) throws ZkMarshallingError
- {
- return new String(arg0);
- }
- };
- client.setZkSerializer(zkSerializer);
- filter = new FilenameFilter()
- {
-
- @Override
- public boolean accept(File dir, String name)
- {
- return !name.startsWith(".");
- }
- };
- }
-
- public static void main(String[] args) throws Exception
- {
- if (args == null || args.length == 0)
- {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.printHelp("java " + ZKDumper.class.getName(), options);
- System.exit(1);
- }
- CommandLineParser parser = new PosixParser();
- CommandLine cmd = parser.parse(options, args);
- cmd.hasOption("zkSvr");
- boolean download = cmd.hasOption("download");
- boolean upload = cmd.hasOption("upload");
- boolean del = cmd.hasOption("delete");
- String zkAddress = cmd.getOptionValue("zkSvr");
- String zkPath = cmd.getOptionValue("zkpath");
- String fsPath = cmd.getOptionValue("fspath");
-
- ZKDumper zkDump = new ZKDumper(zkAddress);
- if (download)
- {
- if (cmd.hasOption("addSuffix"))
- {
- zkDump.suffix = cmd.getOptionValue("addSuffix");
- }
- zkDump.download(zkPath, fsPath + zkPath);
- }
- if (upload)
- {
- if (cmd.hasOption("removeSuffix"))
- {
- zkDump.removeSuffix = true;
- }
- zkDump.upload(zkPath, fsPath);
- }
- if (del)
- {
- zkDump.delete(zkPath);
- }
- }
-
- private void delete(String zkPath)
- {
- client.deleteRecursive(zkPath);
-
- }
-
- public void upload(String zkPath, String fsPath) throws Exception
- {
- File file = new File(fsPath);
- System.out
- .println("Uploading " + file.getCanonicalPath() + " to " + zkPath);
- zkPath = zkPath.replaceAll("[/]+", "/");
- int index = -1;
- if (removeSuffix && (index = file.getName().indexOf(".")) > -1)
- {
- zkPath = zkPath.replaceAll(file.getName().substring(index), "");
- }
- if (file.isDirectory())
- {
- File[] children = file.listFiles(filter);
- client.createPersistent(zkPath, true);
- if (children != null && children.length > 0)
- {
-
- for (File child : children)
- {
- upload(zkPath + "/" + child.getName(), fsPath + "/" + child.getName());
- }
- } else
- {
-
- }
- } else
- {
- BufferedReader bfr = null;
- try
- {
- bfr = new BufferedReader(new FileReader(file));
- StringBuilder sb = new StringBuilder();
- String line;
- String recordDelimiter = "";
- while ((line = bfr.readLine()) != null)
- {
- sb.append(recordDelimiter).append(line);
- recordDelimiter = "\n";
- }
- client.createPersistent(zkPath, sb.toString());
- } catch (Exception e)
- {
- throw e;
- } finally
- {
- if (bfr != null)
- {
- try
- {
- bfr.close();
- } catch (IOException e)
- {
- }
- }
- }
- }
- }
-
- public void download(String zkPath, String fsPath) throws Exception
- {
-
- List<String> children = client.getChildren(zkPath);
- if (children != null && children.size() > 0)
- {
- new File(fsPath).mkdirs();
- for (String child : children)
- {
- String childPath = zkPath.equals("/")? "/" + child : zkPath + "/" + child;
- download(childPath, fsPath + "/" + child);
- }
- } else
- {
- System.out.println("Saving " + zkPath + " to "
- + new File(fsPath + suffix).getCanonicalPath());
- FileWriter fileWriter = new FileWriter(fsPath + suffix);
- Object readData = client.readData(zkPath);
- if (readData != null)
- {
- fileWriter.write((String) readData);
- }
- fileWriter.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/ZKLogFormatter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/ZKLogFormatter.java b/helix-core/src/main/java/com/linkedin/helix/tools/ZKLogFormatter.java
deleted file mode 100644
index 87167d2..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/ZKLogFormatter.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.beans.Introspector;
-import java.beans.PropertyDescriptor;
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.text.DateFormat;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Set;
-import java.util.zip.Adler32;
-import java.util.zip.Checksum;
-
-import javax.xml.bind.annotation.adapters.HexBinaryAdapter;
-
-import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.InputArchive;
-import org.apache.jute.Record;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.DataNode;
-import org.apache.zookeeper.server.DataTree;
-import org.apache.zookeeper.server.persistence.FileHeader;
-import org.apache.zookeeper.server.persistence.FileSnap;
-import org.apache.zookeeper.server.persistence.FileTxnLog;
-import org.apache.zookeeper.server.util.SerializeUtils;
-import org.apache.zookeeper.txn.TxnHeader;
-
-public class ZKLogFormatter
-{
- private static final Logger LOG = Logger.getLogger(ZKLogFormatter.class);
- private static DateFormat dateTimeInstance = DateFormat.getDateTimeInstance(
- DateFormat.SHORT, DateFormat.LONG);
- private static HexBinaryAdapter adapter = new HexBinaryAdapter();
- private static String fieldDelim = ":";
- private static String fieldSep = " ";
-
- static BufferedWriter bw = null;
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception
- {
- if (args.length != 2 && args.length != 3)
- {
- System.err.println("USAGE: LogFormatter <log|snapshot> log_file");
- System.exit(2);
- }
-
- if (args.length == 3)
- {
- bw = new BufferedWriter(new FileWriter(new File(args[2])));
- }
-
- if (args[0].equals("log"))
- {
- readTransactionLog(args[1]);
- } else if (args[0].equals("snapshot"))
- {
- readSnapshotLog(args[1]);
- }
-
- if (bw != null)
- {
- bw.close();
- }
- }
-
- private static void readSnapshotLog(String snapshotPath) throws Exception
- {
- FileInputStream fis = new FileInputStream(snapshotPath);
- BinaryInputArchive ia = BinaryInputArchive.getArchive(fis);
- Map<Long, Integer> sessions = new HashMap<Long, Integer>();
- DataTree dt = new DataTree();
- FileHeader header = new FileHeader();
- header.deserialize(ia, "fileheader");
- if (header.getMagic() != FileSnap.SNAP_MAGIC)
- {
- throw new IOException("mismatching magic headers " + header.getMagic()
- + " != " + FileSnap.SNAP_MAGIC);
- }
- SerializeUtils.deserializeSnapshot(dt, ia, sessions);
-
- if (bw != null)
- {
- bw.write(sessions.toString());
- bw.newLine();
- } else
- {
- System.out.println(sessions);
- }
- traverse(dt, 1, "/");
-
- }
-
- /*
- * Level order traversal
- */
- private static void traverse(DataTree dt, int startId, String startPath) throws Exception
- {
- LinkedList<Pair> queue = new LinkedList<Pair>();
- queue.add(new Pair(startPath, startId));
- while (!queue.isEmpty())
- {
- Pair pair = queue.removeFirst();
- String path = pair._path;
- DataNode head = dt.getNode(path);
- Stat stat = new Stat();
- byte[] data = null;
- try
- {
- data = dt.getData(path, stat, null);
- } catch (NoNodeException e)
- {
- e.printStackTrace();
- }
- // print the node
- format(startId, pair, head, data);
- Set<String> children = head.getChildren();
- if (children != null)
- {
- for (String child : children)
- {
- String childPath;
- if (path.endsWith("/"))
- {
- childPath = path + child;
- } else
- {
- childPath = path + "/" + child;
- }
- queue.add(new Pair(childPath, startId));
- }
- }
- startId = startId + 1;
- }
-
- }
-
- static class Pair
- {
-
- private final String _path;
- private final int _parentId;
-
- public Pair(String path, int parentId)
- {
- _path = path;
- _parentId = parentId;
- }
-
- }
-
- private static void format(int id, Pair pair, DataNode head, byte[] data) throws Exception
- {
- String dataStr = "";
- if (data != null)
- {
- dataStr = new String(data).replaceAll("[\\s]+", "");
- }
- StringBuffer sb = new StringBuffer();
- //@formatter:off
- sb.append("id").append(fieldDelim).append(id).append(fieldSep);
- sb.append("parent").append(fieldDelim).append(pair._parentId).append(fieldSep);
- sb.append("path").append(fieldDelim).append(pair._path).append(fieldSep);
- sb.append("session").append(fieldDelim).append("0x" +Long.toHexString(head.stat.getEphemeralOwner())).append(fieldSep);
- sb.append("czxid").append(fieldDelim).append("0x" +Long.toHexString(head.stat.getCzxid())).append(fieldSep);
- sb.append("ctime").append(fieldDelim).append(head.stat.getCtime()).append(fieldSep);
- sb.append("mtime").append(fieldDelim).append(head.stat.getMtime()).append(fieldSep);
- sb.append("cmzxid").append(fieldDelim).append("0x" +Long.toHexString(head.stat.getMzxid())).append(fieldSep);
- sb.append("pzxid").append(fieldDelim).append("0x" +Long.toHexString(head.stat.getPzxid())).append(fieldSep);
- sb.append("aversion").append(fieldDelim).append(head.stat.getAversion()).append(fieldSep);
- sb.append("cversion").append(fieldDelim).append(head.stat.getCversion()).append(fieldSep);
- sb.append("version").append(fieldDelim).append(head.stat.getVersion()).append(fieldSep);
- sb.append("data").append(fieldDelim).append(dataStr).append(fieldSep);
- //@formatter:on
-
- if (bw != null)
- {
- bw.write(sb.toString());
- bw.newLine();
- } else
- {
- System.out.println(sb);
- }
-
- }
-
- private static void readTransactionLog(String logfilepath)
- throws FileNotFoundException, IOException, EOFException
- {
- FileInputStream fis = new FileInputStream(logfilepath);
- BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis);
- FileHeader fhdr = new FileHeader();
- fhdr.deserialize(logStream, "fileheader");
-
- if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC)
- {
- System.err.println("Invalid magic number for " + logfilepath);
- System.exit(2);
- }
-
- if (bw != null)
- {
- bw.write("ZooKeeper Transactional Log File with dbid "
- + fhdr.getDbid() + " txnlog format version " + fhdr.getVersion());
- bw.newLine();
- } else
- {
- System.out.println("ZooKeeper Transactional Log File with dbid "
- + fhdr.getDbid() + " txnlog format version " + fhdr.getVersion());
- }
-
-
- int count = 0;
- while (true)
- {
- long crcValue;
- byte[] bytes;
- try
- {
- crcValue = logStream.readLong("crcvalue");
-
- bytes = logStream.readBuffer("txnEntry");
- } catch (EOFException e)
- {
- if (bw != null)
- {
- bw.write("EOF reached after " + count + " txns.");
- bw.newLine();
- } else
- {
- System.out.println("EOF reached after " + count + " txns.");
- }
-
- break;
- }
- if (bytes.length == 0)
- {
- // Since we preallocate, we define EOF to be an
- // empty transaction
- if (bw != null)
- {
- bw.write("EOF reached after " + count + " txns.");
- bw.newLine();
- } else
- {
- System.out.println("EOF reached after " + count + " txns.");
- }
-
- return;
- }
- Checksum crc = new Adler32();
- crc.update(bytes, 0, bytes.length);
- if (crcValue != crc.getValue())
- {
- throw new IOException("CRC doesn't match " + crcValue + " vs "
- + crc.getValue());
- }
- InputArchive iab = BinaryInputArchive
- .getArchive(new ByteArrayInputStream(bytes));
- TxnHeader hdr = new TxnHeader();
- Record txn = SerializeUtils.deserializeTxn(iab, hdr);
- if (bw != null)
- {
- bw.write(formatTransaction(hdr, txn));
- bw.newLine();
- } else
- {
- System.out.println(formatTransaction(hdr, txn));
- }
-
- if (logStream.readByte("EOR") != 'B')
- {
- LOG.error("Last transaction was partial.");
- throw new EOFException("Last transaction was partial.");
- }
- count++;
- }
- }
-
- static String op2String(int op)
- {
- switch (op)
- {
- case OpCode.notification:
- return "notification";
- case OpCode.create:
- return "create";
- case OpCode.delete:
- return "delete";
- case OpCode.exists:
- return "exists";
- case OpCode.getData:
- return "getDate";
- case OpCode.setData:
- return "setData";
- case OpCode.getACL:
- return "getACL";
- case OpCode.setACL:
- return "setACL";
- case OpCode.getChildren:
- return "getChildren";
- case OpCode.getChildren2:
- return "getChildren2";
- case OpCode.ping:
- return "ping";
- case OpCode.createSession:
- return "createSession";
- case OpCode.closeSession:
- return "closeSession";
- case OpCode.error:
- return "error";
- default:
- return "unknown " + op;
- }
- }
-
- private static String formatTransaction(TxnHeader header, Record txn)
- {
- StringBuilder sb = new StringBuilder();
-
- sb.append("time").append(fieldDelim).append(header.getTime());
- sb.append(fieldSep).append("session").append(fieldDelim).append("0x")
- .append(Long.toHexString(header.getClientId()));
- sb.append(fieldSep).append("cxid").append(fieldDelim).append("0x")
- .append(Long.toHexString(header.getCxid()));
- sb.append(fieldSep).append("zxid").append(fieldDelim).append("0x")
- .append(Long.toHexString(header.getZxid()));
- sb.append(fieldSep).append("type").append(fieldDelim)
- .append(op2String(header.getType()));
- if (txn != null)
- {
- try
- {
- byte[] data = null;
- for (PropertyDescriptor pd : Introspector.getBeanInfo(txn.getClass())
- .getPropertyDescriptors())
- {
- if (pd.getName().equalsIgnoreCase("data"))
- {
- data = (byte[]) pd.getReadMethod().invoke(txn);
- continue;
- }
- if (pd.getReadMethod() != null && !"class".equals(pd.getName()))
- {
- sb.append(fieldSep)
- .append(pd.getDisplayName())
- .append(fieldDelim)
- .append(
- pd.getReadMethod().invoke(txn).toString()
- .replaceAll("[\\s]+", ""));
- }
- }
- if (data != null)
- {
- sb.append(fieldSep).append("data").append(fieldDelim)
- .append(new String(data).replaceAll("[\\s]+", ""));
- }
- } catch (Exception e)
- {
- LOG.error(
- "Error while retrieving bean property values for " + txn.getClass(),
- e);
- }
- }
-
- return sb.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/ZkLogAnalyzer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/ZkLogAnalyzer.java b/helix-core/src/main/java/com/linkedin/helix/tools/ZkLogAnalyzer.java
deleted file mode 100644
index 29a7bf2..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/ZkLogAnalyzer.java
+++ /dev/null
@@ -1,491 +0,0 @@
-package com.linkedin.helix.tools;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-
-public class ZkLogAnalyzer
-{
- private static Logger LOG = Logger.getLogger(ZkLogAnalyzer.class);
- private static boolean dump = false; ;
- final static ZNRecordSerializer _deserializer = new ZNRecordSerializer();
-
- static class Stats
- {
- int msgSentCount = 0;
- int msgSentCount_O2S = 0; // Offline to Slave
- int msgSentCount_S2M = 0; // Slave to Master
- int msgSentCount_M2S = 0; // Master to Slave
- int msgDeleteCount = 0;
- int msgModifyCount = 0;
- int curStateCreateCount = 0;
- int curStateUpdateCount = 0;
- int extViewCreateCount = 0;
- int extViewUpdateCount = 0;
- }
-
- static String getAttributeValue(String line, String attribute)
- {
- if (line == null)
- return null;
- String[] parts = line.split("\\s");
- if (parts != null && parts.length > 0)
- {
- for (int i = 0; i < parts.length; i++)
- {
- if (parts[i].startsWith(attribute))
- {
- String val = parts[i].substring(attribute.length());
- return val;
- }
- }
- }
- return null;
- }
-
- static String findLastCSUpdateBetween(List<String> csUpdateLines, long start, long end)
- {
- long lastCSUpdateTimestamp = Long.MIN_VALUE;
- String lastCSUpdateLine = null;
- for (String line : csUpdateLines)
- {
- // ZNRecord record = getZNRecord(line);
- long timestamp = Long.parseLong(getAttributeValue(line, "time:"));
- if (timestamp >= start && timestamp <= end && timestamp > lastCSUpdateTimestamp)
- {
- lastCSUpdateTimestamp = timestamp;
- lastCSUpdateLine = line;
- }
- }
- assert (lastCSUpdateLine != null) : "No CS update between " + start + " - " + end;
- return lastCSUpdateLine;
- }
-
- static ZNRecord getZNRecord(String line)
- {
- ZNRecord record = null;
- String value = getAttributeValue(line, "data:");
- if (value != null)
- {
- record = (ZNRecord) _deserializer.deserialize(value.getBytes());
- // if (record == null)
- // {
- // System.out.println(line);
- // }
- }
- return record;
- }
-
- public static void main(String[] args) throws Exception
- {
- if (args.length != 3)
- {
- System.err.println("USAGE: ZkLogAnalyzer zkLogDir clusterName testStartTime (yyMMdd_hhmmss_SSS)");
- System.exit(1);
- }
-
- System.out.println("ZkLogAnalyzer called with args: " + Arrays.toString(args));
- // get create-timestamp of "/" + clusterName
- // find all zk logs after that create-timestamp and parse them
- // save parsed log in /tmp/zkLogAnalyzor_zklog.parsed0,1,2...
-
- String zkLogDir = args[0];
- String clusterName = args[1];
- // String zkAddr = args[2];
- String startTimeStr = args[2];
- // ZkClient zkClient = new ZkClient(zkAddr);
- // Stat clusterCreateStat = zkClient.getStat("/" + clusterName);
- SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS");
- Date date = formatter.parse(startTimeStr);
- long startTimeStamp = date.getTime();
-
- System.out.println(clusterName + " created at " + date);
- while (zkLogDir.endsWith("/"))
- {
- zkLogDir = zkLogDir.substring(0, zkLogDir.length() - 1);
- }
- if (!zkLogDir.endsWith("/version-2"))
- {
- zkLogDir = zkLogDir + "/version-2";
- }
- File dir = new File(zkLogDir);
- File[] zkLogs = dir.listFiles(new FileFilter()
- {
-
- @Override
- public boolean accept(File file)
- {
- return file.isFile() && (file.getName().indexOf("log") != -1);
- }
- });
-
- // lastModified time -> zkLog
- TreeMap<Long, String> lastZkLogs = new TreeMap<Long, String>();
- for (File file : zkLogs)
- {
- if (file.lastModified() > startTimeStamp)
- {
- lastZkLogs.put(file.lastModified(), file.getAbsolutePath());
- }
- }
-
- List<String> parsedZkLogs = new ArrayList<String>();
- int i = 0;
- System.out.println("zk logs last modified later than "
- + new Timestamp(startTimeStamp));
- for (Long lastModified : lastZkLogs.keySet())
- {
- String fileName = lastZkLogs.get(lastModified);
- System.out.println(new Timestamp(lastModified) + ": "
- + (fileName.substring(fileName.lastIndexOf('/') + 1)));
-
- String parsedFileName = "zkLogAnalyzor_zklog.parsed" + i;
- i++;
- ZKLogFormatter.main(new String[] { "log", fileName, parsedFileName });
- parsedZkLogs.add(parsedFileName);
- }
-
- // sessionId -> create liveInstance line
- Map<String, String> sessionMap = new HashMap<String, String>();
-
- // message send lines in time order
- // List<String> sendMessageLines = new ArrayList<String>();
-
- // CS update lines in time order
- List<String> csUpdateLines = new ArrayList<String>();
-
- String leaderSession = null;
-
- System.out.println();
- Stats stats = new Stats();
- long lastTestStartTimestamp = Long.MAX_VALUE;
- long controllerStartTime = 0;
- for (String parsedZkLog : parsedZkLogs)
- {
-
- FileInputStream fis = new FileInputStream(parsedZkLog);
- BufferedReader br = new BufferedReader(new InputStreamReader(fis));
-
- String inputLine;
- while ((inputLine = br.readLine()) != null)
- {
- String timestamp = getAttributeValue(inputLine, "time:");
- if (timestamp == null)
- {
- continue;
- }
- long timestampVal = Long.parseLong(timestamp);
- if (timestampVal < startTimeStamp)
- {
- continue;
- }
-
- if (dump == true)
- {
- String printLine = inputLine.replaceAll("data:.*", "");
- printLine = new Timestamp(timestampVal) + " " + printLine.substring(printLine.indexOf("session:"));
- System.err.println(printLine);
- }
-
- if (inputLine.indexOf("/start_disable") != -1)
- {
- dump = true;
- }
- if (inputLine.indexOf("/" + clusterName + "/CONFIGS/CLUSTER/verify") != -1)
- {
- String type = getAttributeValue(inputLine, "type:");
- if (type.equals("delete"))
- {
- System.out.println(timestamp + ": verify done");
- System.out.println("lastTestStartTimestamp:" + lastTestStartTimestamp);
- String lastCSUpdateLine =
- findLastCSUpdateBetween(csUpdateLines,
- lastTestStartTimestamp,
- timestampVal);
- long lastCSUpdateTimestamp =
- Long.parseLong(getAttributeValue(lastCSUpdateLine, "time:"));
- System.out.println("Last CS Update:" + lastCSUpdateTimestamp);
-
- System.out.println("state transition latency: "
- + +(lastCSUpdateTimestamp - lastTestStartTimestamp) + "ms");
-
- System.out.println("state transition latency since controller start: "
- + +(lastCSUpdateTimestamp - controllerStartTime) + "ms");
-
- System.out.println("Create MSG\t" + stats.msgSentCount + "\t"
- + stats.msgSentCount_O2S + "\t" + stats.msgSentCount_S2M + "\t"
- + stats.msgSentCount_M2S);
- System.out.println("Modify MSG\t" + stats.msgModifyCount);
- System.out.println("Delete MSG\t" + stats.msgDeleteCount);
- System.out.println("Create CS\t" + stats.curStateCreateCount);
- System.out.println("Update CS\t" + stats.curStateUpdateCount);
- System.out.println("Create EV\t" + stats.extViewCreateCount);
- System.out.println("Update EV\t" + stats.extViewUpdateCount);
-
- System.out.println();
- stats = new Stats();
- lastTestStartTimestamp = Long.MAX_VALUE;
- }
- }
- else if (inputLine.indexOf("/" + clusterName + "/LIVEINSTANCES/") != -1)
- {
- // cluster startup
- if (timestampVal < lastTestStartTimestamp)
- {
- System.out.println("START cluster. SETTING lastTestStartTimestamp to "
- + new Timestamp(timestampVal) + "\nline:" + inputLine);
- lastTestStartTimestamp = timestampVal;
- }
-
- ZNRecord record = getZNRecord(inputLine);
- LiveInstance liveInstance = new LiveInstance(record);
- String session = getAttributeValue(inputLine, "session:");
- sessionMap.put(session, inputLine);
- System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LIVEINSTANCE "
- + liveInstance.getInstanceName());
- }
- else if (inputLine.indexOf("closeSession") != -1)
- {
- // kill any instance
- String session = getAttributeValue(inputLine, "session:");
- if (sessionMap.containsKey(session))
- {
- if (timestampVal < lastTestStartTimestamp)
- {
- System.out.println("KILL node. SETTING lastTestStartTimestamp to " + timestampVal
- + " line:" + inputLine);
- lastTestStartTimestamp = timestampVal;
- }
- String line = sessionMap.get(session);
- ZNRecord record = getZNRecord(line);
- LiveInstance liveInstance = new LiveInstance(record);
-
- System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": close session "
- + liveInstance.getInstanceName());
- dump = true;
- }
- }
- else if (inputLine.indexOf("/" + clusterName + "/CONFIGS/PARTICIPANT") != -1)
- {
- // disable a partition
- String type = getAttributeValue(inputLine, "type:");
- if (type.equals("setData") && inputLine.indexOf("HELIX_DISABLED_PARTITION") != -1)
- {
- if (timestampVal < lastTestStartTimestamp)
- {
- System.out.println("DISABLE partition. SETTING lastTestStartTimestamp to " + timestampVal
- + " line:" + inputLine);
- lastTestStartTimestamp = timestampVal;
- }
- }
- } else if (inputLine.indexOf("/" + clusterName + "/CONTROLLER/LEADER") != -1)
- {
- // leaderLine = inputLine;
- ZNRecord record = getZNRecord(inputLine);
- LiveInstance liveInstance = new LiveInstance(record);
- String session = getAttributeValue(inputLine, "session:");
- leaderSession = session;
- controllerStartTime = Long.parseLong(getAttributeValue(inputLine, "time:"));
- sessionMap.put(session, inputLine);
- System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LEADER "
- + liveInstance.getInstanceName());
- }
- else if (inputLine.indexOf("/" + clusterName + "/") != -1
- && inputLine.indexOf("/CURRENTSTATES/") != -1)
- {
- String type = getAttributeValue(inputLine, "type:");
- if (type.equals("create"))
- {
- stats.curStateCreateCount++;
- }
- else if (type.equals("setData"))
- {
- String path = getAttributeValue(inputLine, "path:");
- csUpdateLines.add(inputLine);
- stats.curStateUpdateCount++;
- // getAttributeValue(line, "data");
- System.out.println("Update currentstate:"
- + new Timestamp(Long.parseLong(timestamp)) + ":" + timestamp + " path:"
- + path);
- }
- }
- else if (inputLine.indexOf("/" + clusterName + "/EXTERNALVIEW/") != -1)
- {
- String session = getAttributeValue(inputLine, "session:");
- if (session.equals(leaderSession))
- {
- String type = getAttributeValue(inputLine, "type:");
- if (type.equals("create"))
- {
- stats.extViewCreateCount++;
- }
- else if (type.equals("setData"))
- {
- stats.extViewUpdateCount++;
- }
- }
-
- // pos = inputLine.indexOf("EXTERNALVIEW");
- // pos = inputLine.indexOf("data:{", pos);
- // if (pos != -1)
- // {
- // String timestamp = getAttributeValue(inputLine, "time:");
- // ZNRecord record =
- // (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
- // .getBytes());
- // ExternalView extView = new ExternalView(record);
- // int masterCnt = ClusterStateVerifier.countStateNbInExtView(extView,
- // "MASTER");
- // int slaveCnt = ClusterStateVerifier.countStateNbInExtView(extView, "SLAVE");
- // if (masterCnt == 1200)
- // {
- // System.out.println(timestamp + ": externalView " + extView.getResourceName()
- // + " has " + masterCnt + " MASTER, " + slaveCnt + " SLAVE");
- // }
- // }
- }
- else if (inputLine.indexOf("/" + clusterName + "/") != -1
- && inputLine.indexOf("/MESSAGES/") != -1)
- {
- String type = getAttributeValue(inputLine, "type:");
-
- if (type.equals("create"))
- {
- ZNRecord record = getZNRecord(inputLine);
- Message msg = new Message(record);
- String sendSession = getAttributeValue(inputLine, "session:");
- if (sendSession.equals(leaderSession)
- && msg.getMsgType().equals("STATE_TRANSITION")
- && msg.getMsgState() == MessageState.NEW)
- {
- // sendMessageLines.add(inputLine);
- stats.msgSentCount++;
-
- if (msg.getFromState().equals("OFFLINE")
- && msg.getToState().equals("SLAVE"))
- {
- stats.msgSentCount_O2S++;
- }
- else if (msg.getFromState().equals("SLAVE")
- && msg.getToState().equals("MASTER"))
- {
- stats.msgSentCount_S2M++;
- }
- else if (msg.getFromState().equals("MASTER")
- && msg.getToState().equals("SLAVE"))
- {
- stats.msgSentCount_M2S++;
- }
- // System.out.println("Message create:"+new
- // Timestamp(Long.parseLong(timestamp)));
- }
-
- // pos = inputLine.indexOf("MESSAGES");
- // pos = inputLine.indexOf("data:{", pos);
- // if (pos != -1)
- // {
- //
- // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
- // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
- // Message msg = new Message(record);
- // MessageState msgState = msg.getMsgState();
- // String msgType = msg.getMsgType();
- // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.NEW)
- // {
- // if (!msgs.containsKey(msg.getMsgId()))
- // {
- // msgs.put(msg.getMsgId(), new MsgItem(Long.parseLong(timestamp), msg));
- // }
- // else
- // {
- // LOG.error("msg: " + msg.getMsgId() + " already sent");
- // }
- //
- // System.out.println(timestamp + ": sendMsg " + msg.getPartitionName() + "("
- // + msg.getFromState() + "->" + msg.getToState() + ") to "
- // + msg.getTgtName() + ", size: " + msgBytes.length);
- // }
- // }
- }
- else if (type.equals("setData"))
- {
- stats.msgModifyCount++;
- // pos = inputLine.indexOf("MESSAGES");
- // pos = inputLine.indexOf("data:{", pos);
- // if (pos != -1)
- // {
- //
- // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
- // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
- // Message msg = new Message(record);
- // MessageState msgState = msg.getMsgState();
- // String msgType = msg.getMsgType();
- // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.READ)
- // {
- // if (!msgs.containsKey(msg.getMsgId()))
- // {
- // LOG.error("msg: " + msg.getMsgId() + " never sent");
- // }
- // else
- // {
- // MsgItem msgItem = msgs.get(msg.getMsgId());
- // if (msgItem.readTime == 0)
- // {
- // msgItem.readTime = Long.parseLong(timestamp);
- // msgs.put(msg.getMsgId(), msgItem);
- // // System.out.println(timestamp + ": readMsg " + msg.getPartitionName()
- // // + "("
- // // + msg.getFromState() + "->" + msg.getToState() + ") to "
- // // + msg.getTgtName() + ", latency: " + (msgItem.readTime -
- // // msgItem.sendTime));
- // }
- // }
- //
- // }
- // }
- }
- else if (type.equals("delete"))
- {
- stats.msgDeleteCount++;
- // String msgId = path.substring(path.lastIndexOf('/') + 1);
- // if (msgs.containsKey(msgId))
- // {
- // MsgItem msgItem = msgs.get(msgId);
- // Message msg = msgItem.msg;
- // msgItem.deleteTime = Long.parseLong(timestamp);
- // msgs.put(msgId, msgItem);
- // msgItem.latency = msgItem.deleteTime - msgItem.sendTime;
- // System.out.println(timestamp + ": delMsg " + msg.getPartitionName() + "("
- // + msg.getFromState() + "->" + msg.getToState() + ") to "
- // + msg.getTgtName() + ", latency: " + msgItem.latency);
- // }
- // else
- // {
- // // messages other than STATE_TRANSITION message
- // // LOG.error("msg: " + msgId + " never sent");
- // }
- }
- }
- } // end of [br.readLine()) != null]
- }
- }
-}