You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[25/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ZkLogCSVFormatter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkLogCSVFormatter.java b/helix-core/src/main/java/org/apache/helix/tools/ZkLogCSVFormatter.java
new file mode 100644
index 0000000..d01a241
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZkLogCSVFormatter.java
@@ -0,0 +1,445 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.util.HelixUtil;
+
+
+public class ZkLogCSVFormatter
+{
+ private static final ZNRecordSerializer _deserializer = new ZNRecordSerializer();
+ private static String _fieldDelim = ",";
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length != 2)
+ {
+ System.err.println("USAGE: ZkLogCSVFormatter log_file output_dir");
+ System.exit(2);
+ }
+ File outputDir = new File(args[1]);
+ if (!outputDir.exists() || !outputDir.isDirectory())
+ {
+ System.err.println(outputDir.getAbsolutePath() + " does NOT exist or is NOT a directory");
+ System.exit(2);
+ }
+ format(args[0], args[1]);
+ }
+
+ private static void formatter(BufferedWriter bw, String... args)
+ {
+ StringBuffer sb = new StringBuffer();
+
+ if (args.length == 0)
+ {
+ return;
+ }
+ else
+ {
+ sb.append(args[0]);
+ for (int i = 1; i < args.length; i++)
+ {
+ sb.append(_fieldDelim).append(args[i]);
+ }
+ }
+
+ try
+ {
+ bw.write(sb.toString());
+ bw.newLine();
+ // System.out.println(sb.toString());
+ }
+ catch (IOException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ private static String getAttributeValue(String line, String attribute)
+ {
+ String[] parts = line.split("\\s");
+ if (parts != null && parts.length > 0)
+ {
+ for (int i = 0; i < parts.length; i++)
+ {
+ if (parts[i].startsWith(attribute))
+ {
+ String val = parts[i].substring(attribute.length());
+ return val;
+ }
+ }
+ }
+ return null;
+ }
+
+ private static void format(String logfilepath, String outputDir) throws FileNotFoundException
+ {
+ try
+ {
+ // input file
+ FileInputStream fis = new FileInputStream(logfilepath);
+ BufferedReader br = new BufferedReader(new InputStreamReader(fis));
+
+ // output files
+ FileOutputStream isFos = new FileOutputStream(outputDir + "/" + "idealState.csv");
+ BufferedWriter isBw = new BufferedWriter(new OutputStreamWriter(isFos));
+
+ FileOutputStream cfgFos = new FileOutputStream(outputDir + "/" + "config.csv");
+ BufferedWriter cfgBw = new BufferedWriter(new OutputStreamWriter(cfgFos));
+
+ FileOutputStream evFos = new FileOutputStream(outputDir + "/" + "externalView.csv");
+ BufferedWriter evBw = new BufferedWriter(new OutputStreamWriter(evFos));
+
+ FileOutputStream smdCntFos =
+ new FileOutputStream(outputDir + "/" + "stateModelDefStateCount.csv");
+ BufferedWriter smdCntBw = new BufferedWriter(new OutputStreamWriter(smdCntFos));
+
+ FileOutputStream smdNextFos =
+ new FileOutputStream(outputDir + "/" + "stateModelDefStateNext.csv");
+ BufferedWriter smdNextBw = new BufferedWriter(new OutputStreamWriter(smdNextFos));
+
+ FileOutputStream csFos = new FileOutputStream(outputDir + "/" + "currentState.csv");
+ BufferedWriter csBw = new BufferedWriter(new OutputStreamWriter(csFos));
+
+ FileOutputStream msgFos = new FileOutputStream(outputDir + "/" + "messages.csv");
+ BufferedWriter msgBw = new BufferedWriter(new OutputStreamWriter(msgFos));
+
+ FileOutputStream hrPerfFos = new FileOutputStream(outputDir + "/" + "healthReportDefaultPerfCounters.csv");
+ BufferedWriter hrPerfBw = new BufferedWriter(new OutputStreamWriter(hrPerfFos));
+
+ FileOutputStream liFos =
+ new FileOutputStream(outputDir + "/" + "liveInstances.csv");
+ BufferedWriter liBw = new BufferedWriter(new OutputStreamWriter(liFos));
+
+ formatter(cfgBw, "timestamp", "instanceName", "host", "port", "enabled");
+ formatter(isBw,
+ "timestamp",
+ "resourceName",
+ "partitionNumber",
+ "mode",
+ "partition",
+ "instanceName",
+ "priority");
+ formatter(evBw, "timestamp", "resourceName", "partition", "instanceName", "state");
+ formatter(smdCntBw, "timestamp", "stateModel", "state", "count");
+ formatter(smdNextBw, "timestamp", "stateModel", "from", "to", "next");
+ formatter(liBw, "timestamp", "instanceName", "sessionId", "Operation");
+ formatter(csBw,
+ "timestamp",
+ "resourceName",
+ "partition",
+ "instanceName",
+ "sessionId",
+ "state");
+ formatter(msgBw,
+ "timestamp",
+ "resourceName",
+ "partition",
+ "instanceName",
+ "sessionId",
+ "from",
+ "to",
+ "messageType",
+ "messageState");
+ formatter(hrPerfBw,
+ "timestamp",
+ "instanceName",
+ "availableCPUs",
+ "averageSystemLoad",
+ "freeJvmMemory",
+ "freePhysicalMemory",
+ "totalJvmMemory");
+
+ Map<String, ZNRecord> liveInstanceSessionMap = new HashMap<String, ZNRecord>();
+
+ int pos;
+ String inputLine;
+ while ((inputLine = br.readLine()) != null)
+ {
+ if (inputLine.indexOf("CONFIGS") != -1)
+ {
+ pos = inputLine.indexOf("CONFIGS");
+ pos = inputLine.indexOf("data:{", pos);
+ if (pos != -1)
+ {
+ String timestamp = getAttributeValue(inputLine, "time:");
+ ZNRecord record =
+ (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+ .getBytes());
+
+ formatter(cfgBw,
+ timestamp,
+ record.getId(),
+ record.getSimpleField("HOST"),
+ record.getSimpleField("PORT"),
+ record.getSimpleField("ENABLED"));
+
+ }
+ }
+ else if (inputLine.indexOf("IDEALSTATES") != -1)
+ {
+ pos = inputLine.indexOf("IDEALSTATES");
+ pos = inputLine.indexOf("data:{", pos);
+ if (pos != -1)
+ {
+ String timestamp = getAttributeValue(inputLine, "time:");
+ ZNRecord record =
+ (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+ .getBytes());
+ // System.out.println("record=" + record);
+ for (String partition : record.getListFields().keySet())
+ {
+ List<String> preferenceList = record.getListFields().get(partition);
+ for (int i = 0; i < preferenceList.size(); i++)
+ {
+ String instance = preferenceList.get(i);
+ formatter(isBw,
+ timestamp,
+ record.getId(),
+ record.getSimpleField(IdealStateProperty.NUM_PARTITIONS.toString()),
+ record.getSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString()),
+ partition,
+ instance,
+ Integer.toString(i));
+ }
+ }
+ }
+ }
+ else if (inputLine.indexOf("LIVEINSTANCES") != -1)
+ {
+ pos = inputLine.indexOf("LIVEINSTANCES");
+ pos = inputLine.indexOf("data:{", pos);
+ if (pos != -1)
+ {
+ String timestamp = getAttributeValue(inputLine, "time:");
+ ZNRecord record =
+ (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+ .getBytes());
+ formatter(liBw, timestamp, record.getId(), record.getSimpleField("SESSION_ID"), "ADD");
+ String zkSessionId = getAttributeValue(inputLine, "session:");
+ if (zkSessionId == null)
+ {
+ System.err.println("no zk session id associated with the adding of live instance: "
+ + inputLine);
+ }
+ else
+ {
+ liveInstanceSessionMap.put(zkSessionId, record);
+ }
+ }
+
+ }
+ else if (inputLine.indexOf("EXTERNALVIEW") != -1)
+ {
+ pos = inputLine.indexOf("EXTERNALVIEW");
+ pos = inputLine.indexOf("data:{", pos);
+ if (pos != -1)
+ {
+ String timestamp = getAttributeValue(inputLine, "time:");
+ ZNRecord record =
+ (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+ .getBytes());
+ // System.out.println("record=" + record);
+ for (String partition : record.getMapFields().keySet())
+ {
+ Map<String, String> stateMap = record.getMapFields().get(partition);
+ for (String instance : stateMap.keySet())
+ {
+ String state = stateMap.get(instance);
+ formatter(evBw, timestamp, record.getId(), partition, instance, state);
+ }
+ }
+ }
+ }
+ else if (inputLine.indexOf("STATEMODELDEFS") != -1)
+ {
+ pos = inputLine.indexOf("STATEMODELDEFS");
+ pos = inputLine.indexOf("data:{", pos);
+ if (pos != -1)
+ {
+ String timestamp = getAttributeValue(inputLine, "time:");
+ ZNRecord record =
+ (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+ .getBytes());
+
+ for (String stateInfo : record.getMapFields().keySet())
+ {
+ if (stateInfo.endsWith(".meta"))
+ {
+ Map<String, String> metaMap = record.getMapFields().get(stateInfo);
+ formatter(smdCntBw,
+ timestamp,
+ record.getId(),
+ stateInfo.substring(0, stateInfo.indexOf('.')),
+ metaMap.get("count"));
+ }
+ else if (stateInfo.endsWith(".next"))
+ {
+ Map<String, String> nextMap = record.getMapFields().get(stateInfo);
+ for (String destState : nextMap.keySet())
+ {
+ formatter(smdNextBw,
+ timestamp,
+ record.getId(),
+ stateInfo.substring(0, stateInfo.indexOf('.')),
+ destState,
+ nextMap.get(destState));
+ }
+ }
+ }
+ }
+ }
+ else if (inputLine.indexOf("CURRENTSTATES") != -1)
+ {
+ pos = inputLine.indexOf("CURRENTSTATES");
+ pos = inputLine.indexOf("data:{", pos);
+ if (pos != -1)
+ {
+ String timestamp = getAttributeValue(inputLine, "time:");
+ ZNRecord record =
+ (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+ .getBytes());
+ // System.out.println("record=" + record);
+ for (String partition : record.getMapFields().keySet())
+ {
+ Map<String, String> stateMap = record.getMapFields().get(partition);
+ String path = getAttributeValue(inputLine, "path:");
+ if (path != null)
+ {
+ String instance = HelixUtil.getInstanceNameFromPath(path);
+ formatter(csBw,
+ timestamp,
+ record.getId(),
+ partition,
+ instance,
+ record.getSimpleField("SESSION_ID"),
+ stateMap.get("CURRENT_STATE"));
+ }
+ }
+ }
+ }
+ else if (inputLine.indexOf("MESSAGES") != -1)
+ {
+ pos = inputLine.indexOf("MESSAGES");
+ pos = inputLine.indexOf("data:{", pos);
+ if (pos != -1)
+ {
+ String timestamp = getAttributeValue(inputLine, "time:");
+ ZNRecord record =
+ (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+ .getBytes());
+
+ formatter(msgBw,
+ timestamp,
+ record.getSimpleField("RESOURCE_NAME"),
+ record.getSimpleField("PARTITION_NAME"),
+ record.getSimpleField("TGT_NAME"),
+ record.getSimpleField("TGT_SESSION_ID"),
+ record.getSimpleField("FROM_STATE"),
+ record.getSimpleField("TO_STATE"),
+ record.getSimpleField("MSG_TYPE"),
+ record.getSimpleField("MSG_STATE"));
+ }
+
+ }
+ else if (inputLine.indexOf("closeSession") != -1)
+ {
+ String zkSessionId = getAttributeValue(inputLine, "session:");
+ if (zkSessionId == null)
+ {
+ System.err.println("no zk session id associated with the closing of zk session: "
+ + inputLine);
+ }
+ else
+ {
+ ZNRecord record = liveInstanceSessionMap.remove(zkSessionId);
+ // System.err.println("zkSessionId:" + zkSessionId + ", record:" + record);
+ if (record != null)
+ {
+ String timestamp = getAttributeValue(inputLine, "time:");
+ formatter(liBw,
+ timestamp,
+ record.getId(),
+ record.getSimpleField("SESSION_ID"),
+ "DELETE");
+ }
+ }
+ }
+ else if (inputLine.indexOf("HEALTHREPORT/defaultPerfCounters") != -1)
+ {
+ pos = inputLine.indexOf("HEALTHREPORT/defaultPerfCounters");
+ pos = inputLine.indexOf("data:{", pos);
+ if (pos != -1)
+ {
+ String timestamp = getAttributeValue(inputLine, "time:");
+ ZNRecord record =
+ (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+ .getBytes());
+
+ String path = getAttributeValue(inputLine, "path:");
+ if (path != null)
+ {
+ String instance = HelixUtil.getInstanceNameFromPath(path);
+ formatter(hrPerfBw,
+ timestamp,
+ instance,
+ record.getSimpleField("availableCPUs"),
+ record.getSimpleField("averageSystemLoad"),
+ record.getSimpleField("freeJvmMemory"),
+ record.getSimpleField("freePhysicalMemory"),
+ record.getSimpleField("totalJvmMemory"));
+ }
+ }
+ }
+ }
+
+ br.close();
+ isBw.close();
+ cfgBw.close();
+ evBw.close();
+ smdCntBw.close();
+ smdNextBw.close();
+ csBw.close();
+ msgBw.close();
+ liBw.close();
+ hrPerfBw.close();
+ }
+ catch (Exception e)
+ {
+ System.err.println("Error: " + e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ZnodeOpArg.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZnodeOpArg.java b/helix-core/src/main/java/org/apache/helix/tools/ZnodeOpArg.java
new file mode 100644
index 0000000..c492ad2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZnodeOpArg.java
@@ -0,0 +1,181 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.tools.TestExecutor.ZnodePropertyType;
+
+
+public class ZnodeOpArg
+{
+ public String _znodePath;
+ public ZnodePropertyType _propertyType;
+
+ /**
+ * "+" for update/create if not exist
+ * '-' for remove
+ * "==" for test equals
+ * "!=" for test not equal
+ */
+ public String _operation;
+ public String _key;
+ public ZnodeValue _updateValue;
+
+ public ZnodeOpArg()
+ {
+ }
+
+ /**
+ * verify simple/list/map field: no update value
+ * @param znodePath
+ * @param type
+ * @param op
+ * @param key
+ */
+ public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op, String key)
+ {
+ this(znodePath, type, op, key, new ZnodeValue());
+ }
+
+ /**
+ * verify znode: no update value
+ * @param znodePath
+ * @param type
+ * @param op
+ */
+ public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op)
+ {
+ this(znodePath, type, op, null, new ZnodeValue());
+ }
+
+ /**
+ * simple field change
+ * @param znodePath
+ * @param type
+ * @param op
+ * @param key
+ * @param update
+ */
+ public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op, String key, String update)
+ {
+ this(znodePath, type, op, key, new ZnodeValue(update));
+ }
+
+ /**
+ * list field change
+ * @param znodePath
+ * @param type
+ * @param op
+ * @param key
+ * @param update
+ */
+ public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op, String key, List<String> update)
+ {
+ this(znodePath, type, op, key, new ZnodeValue(update));
+ }
+
+ /**
+ * map field change
+ * @param znodePath
+ * @param type
+ * @param op
+ * @param key
+ * @param update
+ */
+ public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op, String key, Map<String, String> update)
+ {
+ this(znodePath, type, op, key, new ZnodeValue(update));
+ }
+
+ /**
+ * znode change
+ * @param znodePath
+ * @param type
+ * @param op
+ * @param key
+ * @param update
+ */
+ public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op, ZNRecord update)
+ {
+ this(znodePath, type, op, null, new ZnodeValue(update));
+ }
+
+ /**
+ *
+ * @param znodePath
+ * @param type
+ * @param op
+ * @param key
+ * @param update
+ */
+ public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op, String key, ZnodeValue update)
+ {
+ _znodePath = znodePath;
+ _propertyType = type;
+ _operation = op;
+ _key = key;
+ _updateValue = update;
+ }
+
+ @Override
+ public String toString()
+ {
+ String ret = "={\"" +
+ _znodePath + "\", " + _propertyType + "/" + _key + " " + _operation + " " +
+ _updateValue + "}";
+ return ret;
+ }
+
+
+ // TODO temp test; remove it
+ /*
+ public static void main(String[] args)
+ {
+ // null modification command
+ ZnodeOpArg command = new ZnodeOpArg();
+ System.out.println(command);
+
+ // simple modification command
+ command = new ZnodeOpArg("/testPath", ZnodePropertyType.SIMPLE, "+", "key1", "simpleValue1");
+ System.out.println(command);
+
+ // list modification command
+ List<String> list = new ArrayList<String>();
+ list.add("listValue1");
+ list.add("listValue2");
+ command = new ZnodeOpArg("/testPath", ZnodePropertyType.LIST, "+", "key1", list);
+ System.out.println(command);
+
+ // map modification command
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("mapKey1", "mapValue1");
+ map.put("mapKey2", "mapValue2");
+ command = new ZnodeOpArg("/testPath", ZnodePropertyType.MAP, "+", "key1", map);
+ System.out.println(command);
+
+ // map modification command
+ ZNRecord record = new ZNRecord("znrecord");
+ record.setSimpleField("key1", "simpleValue1");
+ record.setListField("key1", list);
+ record.setMapField("key1", map);
+ command = new ZnodeOpArg("/testPath", ZnodePropertyType.ZNODE, "+", record);
+ System.out.println(command);
+ }
+ */
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ZnodeValue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZnodeValue.java b/helix-core/src/main/java/org/apache/helix/tools/ZnodeValue.java
new file mode 100644
index 0000000..21fb271
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZnodeValue.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+
+
+public class ZnodeValue
+{
+ public String _singleValue;
+ public List<String> _listValue;
+ public Map<String, String> _mapValue;
+ public ZNRecord _znodeValue;
+
+ public ZnodeValue()
+ {
+ }
+
+ public ZnodeValue(String value)
+ {
+ _singleValue = value;
+ }
+
+ public ZnodeValue(List<String> value)
+ {
+ _listValue = value;
+ }
+
+ public ZnodeValue(Map<String, String> value)
+ {
+ _mapValue = value;
+ }
+
+ public ZnodeValue(ZNRecord value)
+ {
+ _znodeValue = value;
+ }
+
+ @Override
+ public String toString()
+ {
+ if (_singleValue != null)
+ {
+ return _singleValue;
+ }
+ else if (_listValue != null)
+ {
+ return _listValue.toString();
+ }
+ else if (_mapValue != null)
+ {
+ return _mapValue.toString();
+ }
+ else if (_znodeValue != null)
+ {
+ return _znodeValue.toString();
+ }
+
+ return "null";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/package-info.java b/helix-core/src/main/java/org/apache/helix/tools/package-info.java
new file mode 100644
index 0000000..ccf303c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix tools classes
+ *
+ */
+package org.apache.helix.tools;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
new file mode 100644
index 0000000..62c858f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -0,0 +1,181 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.util;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+
+public final class HelixUtil
+{
+ private HelixUtil()
+ {
+ }
+
+ public static String getPropertyPath(String clusterName, PropertyType type)
+ {
+ return "/" + clusterName + "/" + type.toString();
+ }
+
+ public static String getInstancePropertyPath(String clusterName, String instanceName,
+ PropertyType type)
+ {
+ return getPropertyPath(clusterName, PropertyType.INSTANCES) + "/" + instanceName + "/"
+ + type.toString();
+ }
+
+ public static String getInstancePath(String clusterName, String instanceName)
+ {
+ return getPropertyPath(clusterName, PropertyType.INSTANCES) + "/" + instanceName;
+ }
+
+ public static String getIdealStatePath(String clusterName, String resourceName)
+ {
+ return getPropertyPath(clusterName, PropertyType.IDEALSTATES) + "/" + resourceName;
+ }
+
+ public static String getIdealStatePath(String clusterName)
+ {
+ return getPropertyPath(clusterName, PropertyType.IDEALSTATES);
+ }
+
+ public static String getLiveInstancesPath(String clusterName)
+ {
+ return getPropertyPath(clusterName, PropertyType.LIVEINSTANCES);
+ }
+
+ // public static String getConfigPath(String clusterName)
+ // {
+ // return getPropertyPath(clusterName, PropertyType.PARTICIPANT_CONFIGS);
+ // }
+
+ // public static String getConfigPath(String clusterName, String instanceName)
+ // {
+ // return getConfigPath(clusterName) + "/" + instanceName;
+ // }
+
+ public static String getMessagePath(String clusterName, String instanceName)
+ {
+ return getInstancePropertyPath(clusterName, instanceName, PropertyType.MESSAGES);
+ }
+
+ public static String getCurrentStateBasePath(String clusterName, String instanceName)
+ {
+ return getInstancePropertyPath(clusterName, instanceName, PropertyType.CURRENTSTATES);
+ }
+
+ /**
+ * Even though this is simple we want to have the mechanism of bucketing the
+ * partitions. If we have P partitions and N nodes with K replication factor
+ * and D databases. Then on each node we will have (P/N)*K*D partitions. And
+ * cluster manager neeeds to maintain watch on each of these nodes for every
+ * node. So over all cluster manager will have P*K*D watches which can be
+ * quite large given that we over partition.
+ *
+ * The other extreme is having one znode per storage per database. This will
+ * result in N*D watches which is good. But data in every node might become
+ * really big since it has to save partition
+ *
+ * Ideally we want to balance between the two models
+ *
+ */
+ public static String getCurrentStatePath(String clusterName, String instanceName,
+ String sessionId, String stateUnitKey)
+ {
+ return getInstancePropertyPath(clusterName, instanceName, PropertyType.CURRENTSTATES) + "/"
+ + sessionId + "/" + stateUnitKey;
+ }
+
+ public static String getExternalViewPath(String clusterName)
+ {
+ return getPropertyPath(clusterName, PropertyType.EXTERNALVIEW);
+ }
+
+ public static String getStateModelDefinitionPath(String clusterName)
+ {
+ return getPropertyPath(clusterName, PropertyType.STATEMODELDEFS);
+ }
+
+ public static String getExternalViewPath(String clusterName, String resourceName)
+ {
+ return getPropertyPath(clusterName, PropertyType.EXTERNALVIEW) + "/" + resourceName;
+ }
+
+ public static String getLiveInstancePath(String clusterName, String instanceName)
+ {
+ return getPropertyPath(clusterName, PropertyType.LIVEINSTANCES) + "/" + instanceName;
+ }
+
+ public static String getMemberInstancesPath(String clusterName)
+ {
+ return getPropertyPath(clusterName, PropertyType.INSTANCES);
+ }
+
+ public static String getErrorsPath(String clusterName, String instanceName)
+ {
+ return getInstancePropertyPath(clusterName, instanceName, PropertyType.ERRORS);
+ }
+
+ public static String getStatusUpdatesPath(String clusterName, String instanceName)
+ {
+ return getInstancePropertyPath(clusterName, instanceName, PropertyType.STATUSUPDATES);
+ }
+
+ public static String getHealthPath(String clusterName, String instanceName)
+ {
+ return PropertyPathConfig.getPath(PropertyType.HEALTHREPORT, clusterName, instanceName);
+ }
+
+ public static String getPersistentStatsPath(String clusterName)
+ {
+ return PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
+ }
+
+ public static String getAlertsPath(String clusterName)
+ {
+ return PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
+ }
+
+ public static String getAlertStatusPath(String clusterName)
+ {
+ return PropertyPathConfig.getPath(PropertyType.ALERT_STATUS, clusterName);
+ }
+
+ public static String getInstanceNameFromPath(String path)
+ {
+ // path structure
+ // /<cluster_name>/instances/<instance_name>/[currentStates/messages]
+ if (path.contains("/" + PropertyType.INSTANCES + "/"))
+ {
+ String[] split = path.split("\\/");
+ if (split.length > 3)
+ {
+ return split[3];
+ }
+ }
+ return null;
+ }
+
+ // distributed cluster controller
+ public static String getControllerPath(String clusterName)
+ {
+ return getPropertyPath(clusterName, PropertyType.CONTROLLER);
+ }
+
+ public static String getControllerPropertyPath(String clusterName, PropertyType type)
+ {
+ return PropertyPathConfig.getPath(type, clusterName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
new file mode 100644
index 0000000..d3060aa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
@@ -0,0 +1,629 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.util;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.Error;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StatusUpdate;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Util class to create statusUpdates ZK records and error ZK records. These message
+ * records are for diagnostics only, and they are stored on the "StatusUpdates" and
+ * "errors" ZNodes in the zookeeper instances.
+ *
+ *
+ * */
+public class StatusUpdateUtil
+{
+ static Logger _logger = Logger.getLogger(StatusUpdateUtil.class);
+
+ public static class Transition implements Comparable<Transition>
+ {
+ private final String _msgID;
+ private final long _timeStamp;
+ private final String _from;
+ private final String _to;
+
+ public Transition(String msgID, long timeStamp, String from, String to)
+ {
+ this._msgID = msgID;
+ this._timeStamp = timeStamp;
+ this._from = from;
+ this._to = to;
+ }
+
+ @Override
+ public int compareTo(Transition t)
+ {
+ if (_timeStamp < t._timeStamp)
+ return -1;
+ else if (_timeStamp > t._timeStamp)
+ return 1;
+ else
+ return 0;
+ }
+
+ public boolean equals(Transition t)
+ {
+ return (_timeStamp == t._timeStamp && _from.equals(t._from) && _to.equals(t._to));
+ }
+
+ public String getFromState()
+ {
+ return _from;
+ }
+
+ public String getToState()
+ {
+ return _to;
+ }
+
+ public String getMsgID()
+ {
+ return _msgID;
+ }
+
+ @Override
+ public String toString()
+ {
+ return _msgID + ":" + _timeStamp + ":" + _from + "->" + _to;
+ }
+ }
+
+ public static enum TaskStatus
+ {
+ UNKNOWN, NEW, SCHEDULED, INVOKING, COMPLETED, FAILED
+ }
+
+ public static class StatusUpdateContents
+ {
+ private final List<Transition> _transitions;
+ private final Map<String, TaskStatus> _taskMessages;
+
+ private StatusUpdateContents(List<Transition> transitions,
+ Map<String, TaskStatus> taskMessages)
+ {
+ this._transitions = transitions;
+ this._taskMessages = taskMessages;
+ }
+
+ public static StatusUpdateContents getStatusUpdateContents(HelixDataAccessor accessor,
+ String instance,
+ String resourceGroup,
+ String partition)
+ {
+ return getStatusUpdateContents(accessor, instance, resourceGroup, null, partition);
+ }
+
+ // TODO: We should build a map and return the key instead of searching
+ // everytime
+ // for an (instance, resourceGroup, session, partition) tuple.
+ // But such a map is very similar to what exists in ZNRecord
+ // passing null for sessionID results in searching across all sessions
+ public static StatusUpdateContents getStatusUpdateContents(HelixDataAccessor accessor,
+ String instance,
+ String resourceGroup,
+ String sessionID,
+ String partition)
+ {
+ Builder keyBuilder = accessor.keyBuilder();
+
+ List<ZNRecord> instances =
+ HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs()));
+ List<ZNRecord> partitionRecords = new ArrayList<ZNRecord>();
+ for (ZNRecord znRecord : instances)
+ {
+ String instanceName = znRecord.getId();
+ if (!instanceName.equals(instance))
+ {
+ continue;
+ }
+
+ List<String> sessions = accessor.getChildNames(keyBuilder.sessions(instanceName));
+ for (String session : sessions)
+ {
+ if (sessionID != null && !session.equals(sessionID))
+ {
+ continue;
+ }
+
+ List<String> resourceGroups =
+ accessor.getChildNames(keyBuilder.stateTransitionStatus(instanceName,
+ session));
+ for (String resourceGroupName : resourceGroups)
+ {
+ if (!resourceGroupName.equals(resourceGroup))
+ {
+ continue;
+ }
+
+ List<String> partitionStrings =
+ accessor.getChildNames(keyBuilder.stateTransitionStatus(instanceName,
+ session,
+ resourceGroupName));
+
+ for (String partitionString : partitionStrings)
+ {
+ ZNRecord partitionRecord =
+ accessor.getProperty(keyBuilder.stateTransitionStatus(instanceName,
+ session,
+ resourceGroupName,
+ partitionString))
+ .getRecord();
+ if (!partitionString.equals(partition))
+ {
+ continue;
+ }
+ partitionRecords.add(partitionRecord);
+ }
+ }
+ }
+ }
+
+ return new StatusUpdateContents(getSortedTransitions(partitionRecords),
+ getTaskMessages(partitionRecords));
+ }
+
+ public List<Transition> getTransitions()
+ {
+ return _transitions;
+ }
+
+ public Map<String, TaskStatus> getTaskMessages()
+ {
+ return _taskMessages;
+ }
+
+ // input: List<ZNRecord> corresponding to (instance, database,
+ // partition) tuples across all sessions
+ // return list of transitions sorted from earliest to latest
+ private static List<Transition> getSortedTransitions(List<ZNRecord> partitionRecords)
+ {
+ List<Transition> transitions = new ArrayList<Transition>();
+ for (ZNRecord partition : partitionRecords)
+ {
+ Map<String, Map<String, String>> mapFields = partition.getMapFields();
+ for (String key : mapFields.keySet())
+ {
+ if (key.startsWith("MESSAGE"))
+ {
+ Map<String, String> m = mapFields.get(key);
+ long createTimeStamp = 0;
+ try
+ {
+ createTimeStamp = Long.parseLong(m.get("CREATE_TIMESTAMP"));
+ }
+ catch (Exception e)
+ {
+ }
+ transitions.add(new Transition(m.get("MSG_ID"),
+ createTimeStamp,
+ m.get("FROM_STATE"),
+ m.get("TO_STATE")));
+ }
+ }
+ }
+ Collections.sort(transitions);
+ return transitions;
+ }
+
+ private static Map<String, TaskStatus> getTaskMessages(List<ZNRecord> partitionRecords)
+ {
+ Map<String, TaskStatus> taskMessages = new HashMap<String, TaskStatus>();
+ for (ZNRecord partition : partitionRecords)
+ {
+ Map<String, Map<String, String>> mapFields = partition.getMapFields();
+ // iterate over the task status updates in the order they occurred
+ // so that the last status can be recorded
+ for (String key : mapFields.keySet())
+ {
+ if (key.contains("STATE_TRANSITION"))
+ {
+ Map<String, String> m = mapFields.get(key);
+ String id = m.get("MSG_ID");
+ String statusString = m.get("AdditionalInfo");
+ TaskStatus status = TaskStatus.UNKNOWN;
+ if (statusString.contains("scheduled"))
+ status = TaskStatus.SCHEDULED;
+ else if (statusString.contains("invoking"))
+ status = TaskStatus.INVOKING;
+ else if (statusString.contains("completed"))
+ status = TaskStatus.COMPLETED;
+
+ taskMessages.put(id, status);
+ }
+ }
+ }
+ return taskMessages;
+ }
+ }
+
+ public enum Level
+ {
+ HELIX_ERROR, HELIX_WARNING, HELIX_INFO
+ }
+
+ /**
+ * Creates an empty ZNRecord as the statusUpdate/error record
+ *
+ * @param id
+ */
+ public ZNRecord createEmptyStatusUpdateRecord(String id)
+ {
+ return new ZNRecord(id);
+ }
+
+ /**
+ * Create a ZNRecord for a message, which stores the content of the message (stored in
+ * simple fields) into the ZNRecord mapFields. In this way, the message update can be
+ * merged with the previous status update record in the zookeeper. See ZNRecord.merge()
+ * for more details.
+ * */
+ ZNRecord createMessageLogRecord(Message message)
+ {
+ ZNRecord result = new ZNRecord(getStatusUpdateRecordName(message));
+ String mapFieldKey = "MESSAGE " + message.getMsgId();
+ result.setMapField(mapFieldKey, new TreeMap<String, String>());
+
+ // Store all the simple fields of the message in the new ZNRecord's map
+ // field.
+ for (String simpleFieldKey : message.getRecord().getSimpleFields().keySet())
+ {
+ result.getMapField(mapFieldKey).put(simpleFieldKey,
+ message.getRecord()
+ .getSimpleField(simpleFieldKey));
+ }
+ if (message.getResultMap() != null)
+ {
+ result.setMapField("MessageResult", message.getResultMap());
+ }
+ return result;
+ }
+
+ Map<String, String> _recordedMessages = new ConcurrentHashMap<String, String>();
+
+ /**
+ * Create a statusupdate that is related to a cluster manager message.
+ *
+ * @param message
+ * the related cluster manager message
+ * @param level
+ * the error level
+ * @param classInfo
+ * class info about the class that reports the status update
+ * @param additional
+ * info the additional debug information
+ */
+ public ZNRecord createMessageStatusUpdateRecord(Message message,
+ Level level,
+ Class classInfo,
+ String additionalInfo)
+ {
+ ZNRecord result = createEmptyStatusUpdateRecord(getStatusUpdateRecordName(message));
+ Map<String, String> contentMap = new TreeMap<String, String>();
+
+ contentMap.put("Message state", message.getMsgState().toString());
+ contentMap.put("AdditionalInfo", additionalInfo);
+ contentMap.put("Class", classInfo.toString());
+ contentMap.put("MSG_ID", message.getMsgId());
+
+ DateFormat formatter = new SimpleDateFormat("yyyyMMdd-HHmmss.SSSSSS");
+ String time = formatter.format(new Date());
+
+ String id =
+ String.format("%4s %26s ", level.toString(), time)
+ + getRecordIdForMessage(message);
+
+ result.setMapField(id, contentMap);
+
+ return result;
+ }
+
+ String getRecordIdForMessage(Message message)
+ {
+ if (message.getMsgType().equals(MessageType.STATE_TRANSITION))
+ {
+ return message.getPartitionName() + " Trans:" + message.getFromState().charAt(0)
+ + "->" + message.getToState().charAt(0) + " " + UUID.randomUUID().toString();
+ }
+ else
+ {
+ return message.getMsgType() + " " + UUID.randomUUID().toString();
+ }
+ }
+
+ /**
+ * Create a statusupdate that is related to a cluster manager message, then record it to
+ * the zookeeper store.
+ *
+ * @param message
+ * the related cluster manager message
+ * @param level
+ * the error level
+ * @param classInfo
+ * class info about the class that reports the status update
+ * @param additional
+ * info the additional debug information
+ * @param accessor
+ * the zookeeper data accessor that writes the status update to zookeeper
+ */
+ public void logMessageStatusUpdateRecord(Message message,
+ Level level,
+ Class classInfo,
+ String additionalInfo,
+ HelixDataAccessor accessor)
+ {
+ try
+ {
+ ZNRecord record =
+ createMessageStatusUpdateRecord(message, level, classInfo, additionalInfo);
+ publishStatusUpdateRecord(record, message, level, accessor);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Exception while logging status update", e);
+ }
+ }
+
+ public void logError(Message message,
+ Class classInfo,
+ String additionalInfo,
+ HelixDataAccessor accessor)
+ {
+ logMessageStatusUpdateRecord(message,
+ Level.HELIX_ERROR,
+ classInfo,
+ additionalInfo,
+ accessor);
+ }
+
+ public void logError(Message message,
+ Class classInfo,
+ Exception e,
+ String additionalInfo,
+ HelixDataAccessor accessor)
+ {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ e.printStackTrace(pw);
+ logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo, additionalInfo
+ + sw.toString(), accessor);
+ }
+
+ public void logInfo(Message message,
+ Class classInfo,
+ String additionalInfo,
+ HelixDataAccessor accessor)
+ {
+ logMessageStatusUpdateRecord(message,
+ Level.HELIX_INFO,
+ classInfo,
+ additionalInfo,
+ accessor);
+ }
+
+ public void logWarning(Message message,
+ Class classInfo,
+ String additionalInfo,
+ HelixDataAccessor accessor)
+ {
+ logMessageStatusUpdateRecord(message,
+ Level.HELIX_WARNING,
+ classInfo,
+ additionalInfo,
+ accessor);
+ }
+
+ /**
+ * Write a status update record to zookeeper to the zookeeper store.
+ *
+ * @param record
+ * the status update record
+ * @param message
+ * the message to be logged
+ * @param level
+ * the error level of the message update
+ * @param accessor
+ * the zookeeper data accessor that writes the status update to zookeeper
+ */
+ void publishStatusUpdateRecord(ZNRecord record,
+ Message message,
+ Level level,
+ HelixDataAccessor accessor)
+ {
+ String instanceName = message.getTgtName();
+ String statusUpdateSubPath = getStatusUpdateSubPath(message);
+ String statusUpdateKey = getStatusUpdateKey(message);
+ String sessionId = message.getExecutionSessionId();
+ if (sessionId == null)
+ {
+ sessionId = message.getTgtSessionId();
+ }
+ if (sessionId == null)
+ {
+ sessionId = "*";
+ }
+
+ Builder keyBuilder = accessor.keyBuilder();
+ if (!_recordedMessages.containsKey(message.getMsgId()))
+ {
+ // TODO instanceName of a controller might be any string
+ if (instanceName.equalsIgnoreCase("Controller"))
+ {
+ accessor.updateProperty(keyBuilder.controllerTaskStatus(statusUpdateSubPath,
+ statusUpdateKey),
+ new StatusUpdate(createMessageLogRecord(message)));
+
+ }
+ else
+ {
+
+ PropertyKey propertyKey =
+ keyBuilder.stateTransitionStatus(instanceName,
+ sessionId,
+ statusUpdateSubPath,
+ statusUpdateKey);
+
+ ZNRecord statusUpdateRecord = createMessageLogRecord(message);
+
+ // For now write participant StatusUpdates to log4j.
+ // we are using restlet as another data channel to report to controller.
+
+ _logger.info("StatusUpdate path:" + propertyKey.getPath() + ", updates:"
+ + statusUpdateRecord);
+ accessor.updateProperty(propertyKey, new StatusUpdate(statusUpdateRecord));
+
+ }
+ _recordedMessages.put(message.getMsgId(), message.getMsgId());
+ }
+
+ if (instanceName.equalsIgnoreCase("Controller"))
+ {
+ accessor.updateProperty(keyBuilder.controllerTaskStatus(statusUpdateSubPath,
+ statusUpdateKey),
+ new StatusUpdate(record));
+ }
+ else
+ {
+
+ PropertyKey propertyKey =
+ keyBuilder.stateTransitionStatus(instanceName,
+ sessionId,
+ statusUpdateSubPath,
+ statusUpdateKey);
+ // For now write participant StatusUpdates to log4j.
+ // we are using restlet as another data channel to report to controller.
+ _logger.info("StatusUpdate path:" + propertyKey.getPath() + ", updates:" + record);
+ accessor.updateProperty(propertyKey, new StatusUpdate(record));
+ }
+
+ // If the error level is ERROR, also write the record to "ERROR" ZNode
+ if (Level.HELIX_ERROR == level)
+ {
+ publishErrorRecord(record, message, accessor);
+ }
+ }
+
+ private String getStatusUpdateKey(Message message)
+ {
+ if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString()))
+ {
+ return message.getPartitionName();
+ }
+ return message.getMsgId();
+ }
+
+ /**
+ * Generate the sub-path under STATUSUPDATE or ERROR path for a status update
+ *
+ */
+ String getStatusUpdateSubPath(Message message)
+ {
+ if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString()))
+ {
+ return message.getResourceName();
+ }
+ else
+ {
+ return message.getMsgType();
+ }
+ }
+
+ String getStatusUpdateRecordName(Message message)
+ {
+ if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString()))
+ {
+ return message.getTgtSessionId() + "__" + message.getResourceName();
+ }
+ return message.getMsgId();
+ }
+
+ /**
+ * Write an error record to zookeeper to the zookeeper store.
+ *
+ * @param record
+ * the status update record
+ * @param message
+ * the message to be logged
+ * @param accessor
+ * the zookeeper data accessor that writes the status update to zookeeper
+ */
+ void publishErrorRecord(ZNRecord record, Message message, HelixDataAccessor accessor)
+ {
+ String instanceName = message.getTgtName();
+ String statusUpdateSubPath = getStatusUpdateSubPath(message);
+ String statusUpdateKey = getStatusUpdateKey(message);
+ String sessionId = message.getExecutionSessionId();
+ if (sessionId == null)
+ {
+ sessionId = message.getTgtSessionId();
+ }
+ if (sessionId == null)
+ {
+ sessionId = "*";
+ }
+
+ Builder keyBuilder = accessor.keyBuilder();
+
+ // TODO remove the hard code: "controller"
+ if (instanceName.equalsIgnoreCase("controller"))
+ {
+ // TODO need to fix: ERRORS_CONTROLLER doesn't have a form of
+ // ../{sessionId}/{subPath}
+ // accessor.setProperty(PropertyType.ERRORS_CONTROLLER, record,
+ // statusUpdateSubPath);
+ accessor.setProperty(keyBuilder.controllerTaskError(statusUpdateSubPath),
+ new Error(record));
+ }
+ else
+ {
+ // accessor.updateProperty(PropertyType.ERRORS,
+ // record,
+ // instanceName,
+ // sessionId,
+ // statusUpdateSubPath,
+ // statusUpdateKey);
+ accessor.updateProperty(keyBuilder.stateTransitionError(instanceName,
+ sessionId,
+ statusUpdateSubPath,
+ statusUpdateKey),
+ new Error(record));
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/util/StringTemplate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/StringTemplate.java b/helix-core/src/main/java/org/apache/helix/util/StringTemplate.java
new file mode 100644
index 0000000..7ceef93
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/StringTemplate.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.util;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+public class StringTemplate
+{
+ private static Logger LOG = Logger.getLogger(StringTemplate.class);
+
+ Map<Enum, Map<Integer, String>> templateMap = new HashMap<Enum, Map<Integer, String>>();
+ static Pattern pattern = Pattern.compile("(\\{.+?\\})");
+
+ public void addEntry(Enum type, int numKeys, String template)
+ {
+ if (!templateMap.containsKey(type))
+ {
+ templateMap.put(type, new HashMap<Integer, String>());
+ }
+ LOG.trace("Add template for type: " + type.name() + ", arguments: " + numKeys
+ + ", template: " + template);
+ templateMap.get(type).put(numKeys, template);
+ }
+
+ public String instantiate(Enum type, String... keys)
+ {
+ if (keys == null)
+ {
+ keys = new String[] {};
+ }
+
+ String template = null;
+ if (templateMap.containsKey(type))
+ {
+ template = templateMap.get(type).get(keys.length);
+ }
+
+ String result = null;
+
+ if (template != null)
+ {
+ result = template;
+ Matcher matcher = pattern.matcher(template);
+ int count = 0;
+ while (matcher.find())
+ {
+ String var = matcher.group();
+ result = result.replace(var, keys[count]);
+ count++;
+ }
+ }
+
+ if (result == null || result.indexOf('{') > -1 || result.indexOf('}') > -1)
+ {
+ String errMsg = "Unable to instantiate template: " + template
+ + " using keys: " + Arrays.toString(keys);
+ LOG.error(errMsg);
+ throw new IllegalArgumentException(errMsg);
+ }
+
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
new file mode 100644
index 0000000..ea1368f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.util;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.zookeeper.ZooKeeper.States;
+
+
+public class ZKClientPool
+{
+ static final Map<String, ZkClient> _zkClientMap = new ConcurrentHashMap<String, ZkClient>();
+ static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
+
+ public static ZkClient getZkClient(String zkServer)
+ {
+ // happy path that we cache the zkclient and it's still connected
+ if (_zkClientMap.containsKey(zkServer))
+ {
+ ZkClient zkClient = _zkClientMap.get(zkServer);
+ if (zkClient.getConnection().getZookeeperState() == States.CONNECTED)
+ {
+ return zkClient;
+ }
+ }
+
+ synchronized (_zkClientMap)
+ {
+ // if we cache a stale zkclient, purge it
+ if (_zkClientMap.containsKey(zkServer))
+ {
+ ZkClient zkClient = _zkClientMap.get(zkServer);
+ if (zkClient.getConnection().getZookeeperState() != States.CONNECTED)
+ {
+ _zkClientMap.remove(zkServer);
+ }
+ }
+
+ // get a new zkclient
+ if (!_zkClientMap.containsKey(zkServer))
+ {
+ ZkClient zkClient = new ZkClient(zkServer, DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+
+ _zkClientMap.put(zkServer, zkClient);
+ }
+ return _zkClientMap.get(zkServer);
+ }
+ }
+
+ public static void reset()
+ {
+ _zkClientMap.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
new file mode 100644
index 0000000..bc8bc11
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.util;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+
+
+//TODO find a proper place for these methods
+public final class ZNRecordUtil
+{
+ private static final Logger logger = Logger.getLogger(ZNRecordUtil.class.getName());
+
+ private ZNRecordUtil()
+ {
+ }
+
+ public static ZNRecord find(String id, List<ZNRecord> list)
+ {
+ for (ZNRecord record : list)
+ {
+ if (record.getId() != null && record.getId().equals(id))
+ {
+ return record;
+ }
+ }
+ return null;
+ }
+
+ public static Map<String, ZNRecord> convertListToMap(List<ZNRecord> recordList)
+ {
+ Map<String, ZNRecord> recordMap = new HashMap<String, ZNRecord>();
+ for (ZNRecord record : recordList)
+ {
+ if (record.getId() != null)
+ {
+ recordMap.put(record.getId(), record);
+ }
+ }
+ return recordMap;
+ }
+
+ public static <T extends Object> List<T> convertListToTypedList(List<ZNRecord> recordList,
+ Class<T> clazz)
+ {
+ List<T> list = new ArrayList<T>();
+ for (ZNRecord record : recordList)
+ {
+ if (record.getId() == null)
+ {
+ logger.error("Invalid record: Id missing in " + record);
+ continue;
+ }
+ try
+ {
+
+ Constructor<T> constructor = clazz.getConstructor(new Class[] { ZNRecord.class });
+ T instance = constructor.newInstance(record);
+ list.add(instance);
+ }
+ catch (Exception e)
+ {
+ logger.error("Error creating an Object of type:" + clazz.getCanonicalName(), e);
+ }
+ }
+ return list;
+ }
+
+ public static <T extends Object> Map<String, T> convertListToTypedMap(List<ZNRecord> recordList,
+ Class<T> clazz)
+ {
+ Map<String, T> map = new HashMap<String, T>();
+ for (ZNRecord record : recordList)
+ {
+ if (record.getId() == null)
+ {
+ logger.error("Invalid record: Id missing in " + record);
+ continue;
+ }
+ try
+ {
+
+ Constructor<T> constructor = clazz.getConstructor(new Class[] { ZNRecord.class });
+ T instance = constructor.newInstance(record);
+ map.put(record.getId(), instance);
+ }
+ catch (Exception e)
+ {
+ logger.error("Error creating an Object of type:" + clazz.getCanonicalName(), e);
+ }
+ }
+ return map;
+ }
+
+ public static <T extends Object> List<T> convertMapToList(Map<String, T> map)
+ {
+ List<T> list = new ArrayList<T>();
+ for (T t : map.values())
+ {
+ list.add(t);
+ }
+ return list;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/util/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/package-info.java b/helix-core/src/main/java/org/apache/helix/util/package-info.java
new file mode 100644
index 0000000..eb14cb3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix utility classes
+ *
+ */
+package org.apache.helix.util;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/AppTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/AppTest.java b/helix-core/src/test/java/org/apache/helix/AppTest.java
new file mode 100644
index 0000000..80961c6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/AppTest.java
@@ -0,0 +1,189 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
+import org.apache.helix.model.Message;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+{
+ /**
+ * Create the test case
+ *
+ * @param testName
+ * name of the test case
+ */
+ public AppTest(String testName)
+ {
+ }
+
+ @Test(enabled = false)
+ private static void testChrootWithZkClient() throws Exception
+ {
+ ZkClient client = new ZkClient("localhost:2181/foo");
+ IZkStateListener stateChangeListener = new IZkStateListener()
+ {
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception
+ {
+ System.out
+ .println("AppTest.main(...).new IZkStateListener() {...}.handleStateChanged()"
+ + state);
+ }
+
+ @Override
+ public void handleNewSession() throws Exception
+ {
+ System.out
+ .println("AppTest.main(...).new IZkStateListener() {...}.handleNewSession()");
+ }
+ };
+ client.subscribeStateChanges(stateChangeListener);
+ boolean waitUntilConnected = client.waitUntilConnected(10000,
+ TimeUnit.MILLISECONDS);
+ System.out.println("Connected " + waitUntilConnected);
+ client.waitForKeeperState(KeeperState.Disconnected, 20000,
+ TimeUnit.MILLISECONDS);
+ // server.start();
+ client.waitUntilConnected();
+ Thread.currentThread().join();
+ }
+
+ @Test(enabled = false)
+ private static void testChroot() throws Exception
+ {
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ System.out.println("Event:" + event);
+ }
+ };
+ ZooKeeper zk = new ZooKeeper("localhost:2181/foo", 6000, watcher);
+ // uncommenting this line will not cause infinite connect/disconnect
+ // zk.create("/", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ zk.exists("/", true);
+ System.out
+ .println("Stop the server and restart it when you see this message");
+ Thread.currentThread().join();
+ }
+
+ @Test(enabled = false)
+ private static void testZKClient() throws InterruptedException
+ {
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+ {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient)
+ {
+ }
+ };
+ String dataDir = "/tmp/dataDir";
+ String logDir = "/tmp/logDir";
+ // ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace, 2181);
+ // server.start();
+
+ ZkClient client = new ZkClient("localhost:2181/foo");
+ IZkStateListener stateChangeListener = new IZkStateListener()
+ {
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception
+ {
+ System.out
+ .println("AppTest.main(...).new IZkStateListener() {...}.handleStateChanged()"
+ + state);
+ }
+
+ @Override
+ public void handleNewSession() throws Exception
+ {
+ System.out
+ .println("AppTest.main(...).new IZkStateListener() {...}.handleNewSession()");
+ }
+ };
+ client.subscribeStateChanges(stateChangeListener);
+ boolean waitUntilConnected = client.waitUntilConnected(10000,
+ TimeUnit.MILLISECONDS);
+ System.out.println("Connected " + waitUntilConnected);
+ IZkChildListener listener1 = new IZkChildListener()
+ {
+
+ @Override
+ public void handleChildChange(String parentPath,
+ List<String> currentChilds) throws Exception
+ {
+ System.out.println("listener 1 Change at path:" + parentPath);
+ }
+ };
+ IZkChildListener listener2 = new IZkChildListener()
+ {
+ @Override
+ public void handleChildChange(String parentPath,
+ List<String> currentChilds) throws Exception
+ {
+ System.out.println("listener2 Change at path:" + parentPath);
+ }
+ };
+
+ client.subscribeChildChanges("/", listener1);
+ client.subscribeChildChanges("/foo", listener2);
+
+ // server.shutdown();
+ client.waitForKeeperState(KeeperState.Disconnected, 20000,
+ TimeUnit.MILLISECONDS);
+ // server.start();
+ client.waitUntilConnected();
+
+ Thread.sleep(1000);
+ client.setZkSerializer(new BytesPushThroughSerializer());
+ client.create("/test", new byte[0], CreateMode.EPHEMERAL);
+ Thread.sleep(1000);
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ //testChroot();
+ // testZKClient();
+ // testChrootWithZkClient();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
new file mode 100644
index 0000000..8dda442
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.mock.storage.DummyProcess.DummyLeaderStandbyStateModelFactory;
+import org.apache.helix.mock.storage.DummyProcess.DummyOnlineOfflineStateModelFactory;
+import org.apache.helix.mock.storage.DummyProcess.DummyStateModelFactory;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+
+public class DummyProcessThread implements Runnable
+{
+ private static final Logger LOG = Logger.getLogger(DummyProcessThread.class);
+
+ HelixManager _manager;
+ String _instanceName;
+
+ public DummyProcessThread(HelixManager manager, String instanceName)
+ {
+ _manager = manager;
+ _instanceName = instanceName;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ DummyStateModelFactory stateModelFactory = new DummyStateModelFactory(0);
+// StateMachineEngine genericStateMachineHandler =
+// new StateMachineEngine();
+ StateMachineEngine stateMach = _manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+
+ DummyLeaderStandbyStateModelFactory stateModelFactory1 = new DummyLeaderStandbyStateModelFactory(10);
+ DummyOnlineOfflineStateModelFactory stateModelFactory2 = new DummyOnlineOfflineStateModelFactory(10);
+ stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory1);
+ stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
+// _manager.getMessagingService()
+// .registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
+// genericStateMachineHandler);
+
+ _manager.connect();
+ Thread.currentThread().join();
+ }
+ catch (InterruptedException e)
+ {
+ String msg =
+ "participant:" + _instanceName + ", " + Thread.currentThread().getName()
+ + " interrupted";
+ LOG.info(msg);
+ // System.err.println(msg);
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/ExternalCommand.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ExternalCommand.java b/helix-core/src/test/java/org/apache/helix/ExternalCommand.java
new file mode 100644
index 0000000..5183cde
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/ExternalCommand.java
@@ -0,0 +1,389 @@
+package org.apache.helix;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.log4j.Logger;
+
+/**
+* This class encapsulates a java <code>Process</code> to handle properly
+* output and error and preventing potential deadlocks. The idea is that the
+* command is executed and you can get the error or output. Simple to use.
+* Should not be used for big outputs because they are buffered internally.
+*
+* @author ypujante@linkedin.com
+*/
+public class ExternalCommand
+{
+ public static final String MODULE = ExternalCommand.class.getName();
+ public static final Logger LOG = Logger.getLogger(MODULE);
+
+ private final ProcessBuilder _processBuilder;
+
+ private Process _process;
+ private InputReader _out;
+ private InputReader _err;
+
+ private static class InputReader extends Thread
+ {
+ private static final int BUFFER_SIZE = 2048;
+
+ private final InputStream _in;
+ private final ByteArrayOutputStream _out;
+ private boolean _running = false;
+
+ InputReader(InputStream in)
+ {
+ _in = in;
+ _out = new ByteArrayOutputStream();
+ }
+
+ @Override
+ public void run()
+ {
+ _running = true;
+
+ byte[] buf = new byte[BUFFER_SIZE];
+ int n = 0;
+ try
+ {
+ while((n = _in.read(buf)) != -1)
+ _out.write(buf, 0, n);
+ }
+ catch(IOException e)
+ {
+ LOG.error("error while reading external command", e);
+ }
+
+ _running = false;
+ }
+
+ public byte[] getOutput()
+ {
+ if(_running)
+ throw new IllegalStateException("wait for process to be completed");
+
+ return _out.toByteArray();
+ }
+ }
+ /**
+* Constructor */
+ public ExternalCommand(ProcessBuilder processBuilder)
+ {
+ _processBuilder = processBuilder;
+ }
+
+ /**
+* After creating the command, you have to start it...
+*
+* @throws IOException
+*/
+ public void start() throws IOException
+ {
+ _process = _processBuilder.start();
+ _out = new InputReader(new BufferedInputStream(_process.getInputStream()));
+ _err = new InputReader(new BufferedInputStream(_process.getErrorStream()));
+
+ _out.start();
+ _err.start();
+ }
+
+ /**
+* @see ProcessBuilder
+*/
+ public Map<String, String> getEnvironment()
+ {
+ return _processBuilder.environment();
+ }
+
+ /**
+* @see ProcessBuilder
+*/
+ public File getWorkingDirectory()
+ {
+ return _processBuilder.directory();
+ }
+
+ /**
+* @see ProcessBuilder
+*/
+ public void setWorkingDirectory(File directory)
+ {
+ _processBuilder.directory(directory);
+ }
+
+ /**
+* @see ProcessBuilder
+*/
+ public boolean getRedirectErrorStream()
+ {
+ return _processBuilder.redirectErrorStream();
+ }
+
+ /**
+* @see ProcessBuilder
+*/
+ public void setRedirectErrorStream(boolean redirectErrorStream)
+ {
+ _processBuilder.redirectErrorStream(redirectErrorStream);
+ }
+
+ public byte[] getOutput() throws InterruptedException
+ {
+ waitFor();
+ return _out.getOutput();
+ }
+
+ public byte[] getError() throws InterruptedException
+ {
+ waitFor();
+ return _err.getOutput();
+ }
+
+ /**
+* Returns the output as a string.
+*
+* @param encoding
+* @return encoded string
+* @throws InterruptedException
+* @throws UnsupportedEncodingException
+*/
+ public String getStringOutput(String encoding) throws InterruptedException,
+ UnsupportedEncodingException
+ {
+ return new String(getOutput(), encoding);
+ }
+
+ /**
+* Returns the output as a string. Uses encoding "UTF-8".
+*
+* @return utf8 encoded string
+* @throws InterruptedException
+*/
+ public String getStringOutput() throws InterruptedException
+ {
+ try
+ {
+ return getStringOutput("UTF-8");
+ }
+ catch(UnsupportedEncodingException e)
+ {
+ // should not happen
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+* Returns the error as a string.
+*
+* @param encoding
+* @return error as string
+* @throws InterruptedException
+* @throws UnsupportedEncodingException
+*/
+ public String getStringError(String encoding) throws InterruptedException,
+ UnsupportedEncodingException
+ {
+ return new String(getError(), encoding);
+ }
+
+ /**
+* Returns the error as a string. Uses encoding "UTF-8".
+*
+* @return error as string
+* @throws InterruptedException
+*/
+ public String getStringError() throws InterruptedException
+ {
+ try
+ {
+ return getStringError("UTF-8");
+ }
+ catch(UnsupportedEncodingException e)
+ {
+ // should not happen
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+* Properly waits until everything is complete: joins on the thread that
+* reads the output, joins on the thread that reads the error and finally
+* wait for the process to be finished.
+* @return the status code of the process.
+*
+* @throws InterruptedException
+*/
+ public int waitFor() throws InterruptedException
+ {
+ if(_process == null)
+ throw new IllegalStateException("you must call start first");
+
+ _out.join();
+ _err.join();
+ return _process.waitFor();
+ }
+
+ /**
+* Properly waits until everything is complete: joins on the thread that
+* reads the output, joins on the thread that reads the error and finally
+* wait for the process to be finished.
+* If the process has not completed before the timeout, throws a
+* {@link TimeoutException}
+* @return the status code of the process.
+*
+* @throws TimeoutException
+* @throws InterruptedException
+*/
+ public int waitFor(long timeout) throws InterruptedException, TimeoutException
+ {
+ if(_process == null)
+ throw new IllegalStateException("you must call start first");
+
+// Chronos c = new Chronos();
+ _out.join(timeout);
+// timeout -= c.tick();
+ if (timeout <= 0)
+ throw new TimeoutException("Wait timed out");
+ _err.join(timeout);
+// timeout -= c.tick();
+ if (timeout <= 0)
+ throw new TimeoutException("Wait timed out");
+
+ // there is no timeout in this API, not much we can do here
+ // waiting on the other two threads should give us some safety
+ return _process.waitFor();
+ }
+
+ public int exitValue()
+ {
+ if(_process == null)
+ throw new IllegalStateException("you must call start first");
+
+ return _process.exitValue();
+ }
+
+ public void destroy()
+ {
+ if(_process == null)
+ throw new IllegalStateException("you must call start first");
+
+ _process.destroy();
+ }
+
+ /**
+* Creates an external process from the command. It is not started and you have to call
+* start on it!
+*
+* @param commands the command to execute
+* @return the process */
+ public static ExternalCommand create(String... commands)
+ {
+ ExternalCommand ec = new ExternalCommand(new ProcessBuilder(commands));
+ return ec;
+ }
+
+ /**
+* Creates an external process from the command. It is not started and you have to call
+* start on it!
+*
+* @param commands the command to execute
+* @return the process */
+ public static ExternalCommand create(List<String> commands)
+ {
+ ExternalCommand ec = new ExternalCommand(new ProcessBuilder(commands));
+ return ec;
+ }
+
+ /**
+* Creates an external process from the command. The command is executed.
+*
+* @param commands the commands to execute
+* @return the process
+* @throws IOException if there is an error */
+ public static ExternalCommand start(String... commands) throws IOException
+ {
+ ExternalCommand ec = new ExternalCommand(new ProcessBuilder(commands));
+ ec.start();
+ return ec;
+ }
+
+ /**
+* Executes the external command in the given working directory and waits for it to be
+* finished.
+*
+* @param workingDirectory the root directory from where to run the command
+* @param command the command to execute (should be relative to the working directory
+* @param args the arguments to the command
+* @return the process */
+ public static ExternalCommand execute(File workingDirectory,
+ String command,
+ String... args)
+ throws IOException, InterruptedException
+ {
+ try
+ {
+ return executeWithTimeout(workingDirectory, command, 0, args);
+ }
+ catch (TimeoutException e)
+ {
+ // Can't happen!
+ throw new IllegalStateException(MODULE + ".execute: Unexpected timeout occurred!");
+ }
+ }
+
+/**
+* Executes the external command in the given working directory and waits (until timeout
+* is elapsed) for it to be finished.
+*
+* @param workingDirectory
+* the root directory from where to run the command
+* @param command
+* the command to execute (should be relative to the working directory
+* @param timeout
+* the maximum amount of time to wait for this external command (in ms). If
+* this value is less than or equal to 0, timeout is ignored
+* @param args
+* the arguments to the command
+* @return the process
+*/
+ public static ExternalCommand executeWithTimeout(File workingDirectory,
+ String command,
+ long timeout,
+ String... args)
+ throws IOException, InterruptedException, TimeoutException
+ {
+ List<String> arguments = new ArrayList<String>(args.length + 1);
+
+ arguments.add(new File(workingDirectory, command).getAbsolutePath());
+ arguments.addAll(Arrays.asList(args));
+
+ ExternalCommand cmd = ExternalCommand.create(arguments);
+
+ cmd.setWorkingDirectory(workingDirectory);
+
+ cmd.setRedirectErrorStream(true);
+
+ cmd.start();
+
+ /* Use timeout if it is a valid value! */
+ if (timeout <= 0)
+ cmd.waitFor();
+ else
+ cmd.waitFor(timeout);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug(cmd.getStringOutput());
+
+ return cmd;
+ }
+}