You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC

[25/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ZkLogCSVFormatter.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkLogCSVFormatter.java b/helix-core/src/main/java/org/apache/helix/tools/ZkLogCSVFormatter.java
new file mode 100644
index 0000000..d01a241
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZkLogCSVFormatter.java
@@ -0,0 +1,445 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.util.HelixUtil;
+
+
+public class ZkLogCSVFormatter
+{
+  private static final ZNRecordSerializer _deserializer = new ZNRecordSerializer();
+  private static String _fieldDelim = ",";
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception
+  {
+    if (args.length != 2)
+    {
+      System.err.println("USAGE: ZkLogCSVFormatter log_file output_dir");
+      System.exit(2);
+    }
+    File outputDir = new File(args[1]);
+    if (!outputDir.exists() || !outputDir.isDirectory())
+    {
+      System.err.println(outputDir.getAbsolutePath() + " does NOT exist or is NOT a directory");
+      System.exit(2);
+    }
+    format(args[0], args[1]);
+  }
+
+  private static void formatter(BufferedWriter bw, String... args)
+  {
+    StringBuffer sb = new StringBuffer();
+
+    if (args.length == 0)
+    {
+      return;
+    }
+    else
+    {
+      sb.append(args[0]);
+      for (int i = 1; i < args.length; i++)
+      {
+        sb.append(_fieldDelim).append(args[i]);
+      }
+    }
+
+    try
+    {
+      bw.write(sb.toString());
+      bw.newLine();
+      // System.out.println(sb.toString());
+    }
+    catch (IOException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  private static String getAttributeValue(String line, String attribute)
+  {
+    String[] parts = line.split("\\s");
+    if (parts != null && parts.length > 0)
+    {
+      for (int i = 0; i < parts.length; i++)
+      {
+        if (parts[i].startsWith(attribute))
+        {
+          String val = parts[i].substring(attribute.length());
+          return val;
+        }
+      }
+    }
+    return null;
+  }
+
+  private static void format(String logfilepath, String outputDir) throws FileNotFoundException
+  {
+    try
+    {
+      // input file
+      FileInputStream fis = new FileInputStream(logfilepath);
+      BufferedReader br = new BufferedReader(new InputStreamReader(fis));
+
+      // output files
+      FileOutputStream isFos = new FileOutputStream(outputDir + "/" + "idealState.csv");
+      BufferedWriter isBw = new BufferedWriter(new OutputStreamWriter(isFos));
+
+      FileOutputStream cfgFos = new FileOutputStream(outputDir + "/" + "config.csv");
+      BufferedWriter cfgBw = new BufferedWriter(new OutputStreamWriter(cfgFos));
+
+      FileOutputStream evFos = new FileOutputStream(outputDir + "/" + "externalView.csv");
+      BufferedWriter evBw = new BufferedWriter(new OutputStreamWriter(evFos));
+
+      FileOutputStream smdCntFos =
+          new FileOutputStream(outputDir + "/" + "stateModelDefStateCount.csv");
+      BufferedWriter smdCntBw = new BufferedWriter(new OutputStreamWriter(smdCntFos));
+
+      FileOutputStream smdNextFos =
+          new FileOutputStream(outputDir + "/" + "stateModelDefStateNext.csv");
+      BufferedWriter smdNextBw = new BufferedWriter(new OutputStreamWriter(smdNextFos));
+
+      FileOutputStream csFos = new FileOutputStream(outputDir + "/" + "currentState.csv");
+      BufferedWriter csBw = new BufferedWriter(new OutputStreamWriter(csFos));
+
+      FileOutputStream msgFos = new FileOutputStream(outputDir + "/" + "messages.csv");
+      BufferedWriter msgBw = new BufferedWriter(new OutputStreamWriter(msgFos));
+
+      FileOutputStream hrPerfFos = new FileOutputStream(outputDir + "/" + "healthReportDefaultPerfCounters.csv");
+      BufferedWriter hrPerfBw = new BufferedWriter(new OutputStreamWriter(hrPerfFos));
+
+      FileOutputStream liFos =
+          new FileOutputStream(outputDir + "/" + "liveInstances.csv");
+      BufferedWriter liBw = new BufferedWriter(new OutputStreamWriter(liFos));
+
+      formatter(cfgBw, "timestamp", "instanceName", "host", "port", "enabled");
+      formatter(isBw,
+                "timestamp",
+                "resourceName",
+                "partitionNumber",
+                "mode",
+                "partition",
+                "instanceName",
+                "priority");
+      formatter(evBw, "timestamp", "resourceName", "partition", "instanceName", "state");
+      formatter(smdCntBw, "timestamp", "stateModel", "state", "count");
+      formatter(smdNextBw, "timestamp", "stateModel", "from", "to", "next");
+      formatter(liBw, "timestamp", "instanceName", "sessionId", "Operation");
+      formatter(csBw,
+                "timestamp",
+                "resourceName",
+                "partition",
+                "instanceName",
+                "sessionId",
+                "state");
+      formatter(msgBw,
+                "timestamp",
+                "resourceName",
+                "partition",
+                "instanceName",
+                "sessionId",
+                "from",
+                "to",
+                "messageType",
+                "messageState");
+      formatter(hrPerfBw,
+                "timestamp",
+                "instanceName",
+                "availableCPUs",
+                "averageSystemLoad",
+                "freeJvmMemory",
+                "freePhysicalMemory",
+                "totalJvmMemory");
+
+      Map<String, ZNRecord> liveInstanceSessionMap = new HashMap<String, ZNRecord>();
+
+      int pos;
+      String inputLine;
+      while ((inputLine = br.readLine()) != null)
+      {
+        if (inputLine.indexOf("CONFIGS") != -1)
+        {
+          pos = inputLine.indexOf("CONFIGS");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1)
+          {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+                                                              .getBytes());
+
+            formatter(cfgBw,
+                      timestamp,
+                      record.getId(),
+                      record.getSimpleField("HOST"),
+                      record.getSimpleField("PORT"),
+                      record.getSimpleField("ENABLED"));
+
+          }
+        }
+        else if (inputLine.indexOf("IDEALSTATES") != -1)
+        {
+          pos = inputLine.indexOf("IDEALSTATES");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1)
+          {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+                                                              .getBytes());
+            // System.out.println("record=" + record);
+            for (String partition : record.getListFields().keySet())
+            {
+              List<String> preferenceList = record.getListFields().get(partition);
+              for (int i = 0; i < preferenceList.size(); i++)
+              {
+                String instance = preferenceList.get(i);
+                formatter(isBw,
+                          timestamp,
+                          record.getId(),
+                          record.getSimpleField(IdealStateProperty.NUM_PARTITIONS.toString()),
+                          record.getSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString()),
+                          partition,
+                          instance,
+                          Integer.toString(i));
+              }
+            }
+          }
+        }
+        else if (inputLine.indexOf("LIVEINSTANCES") != -1)
+        {
+          pos = inputLine.indexOf("LIVEINSTANCES");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1)
+          {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+                                                              .getBytes());
+            formatter(liBw, timestamp, record.getId(), record.getSimpleField("SESSION_ID"), "ADD");
+            String zkSessionId = getAttributeValue(inputLine, "session:");
+            if (zkSessionId == null)
+            {
+              System.err.println("no zk session id associated with the adding of live instance: "
+                  + inputLine);
+            }
+            else
+            {
+              liveInstanceSessionMap.put(zkSessionId, record);
+            }
+          }
+
+        }
+        else if (inputLine.indexOf("EXTERNALVIEW") != -1)
+        {
+          pos = inputLine.indexOf("EXTERNALVIEW");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1)
+          {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+                                                              .getBytes());
+            // System.out.println("record=" + record);
+            for (String partition : record.getMapFields().keySet())
+            {
+              Map<String, String> stateMap = record.getMapFields().get(partition);
+              for (String instance : stateMap.keySet())
+              {
+                String state = stateMap.get(instance);
+                formatter(evBw, timestamp, record.getId(), partition, instance, state);
+              }
+            }
+          }
+        }
+        else if (inputLine.indexOf("STATEMODELDEFS") != -1)
+        {
+          pos = inputLine.indexOf("STATEMODELDEFS");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1)
+          {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+                                                              .getBytes());
+
+            for (String stateInfo : record.getMapFields().keySet())
+            {
+              if (stateInfo.endsWith(".meta"))
+              {
+                Map<String, String> metaMap = record.getMapFields().get(stateInfo);
+                formatter(smdCntBw,
+                          timestamp,
+                          record.getId(),
+                          stateInfo.substring(0, stateInfo.indexOf('.')),
+                          metaMap.get("count"));
+              }
+              else if (stateInfo.endsWith(".next"))
+              {
+                Map<String, String> nextMap = record.getMapFields().get(stateInfo);
+                for (String destState : nextMap.keySet())
+                {
+                  formatter(smdNextBw,
+                            timestamp,
+                            record.getId(),
+                            stateInfo.substring(0, stateInfo.indexOf('.')),
+                            destState,
+                            nextMap.get(destState));
+                }
+              }
+            }
+          }
+        }
+        else if (inputLine.indexOf("CURRENTSTATES") != -1)
+        {
+          pos = inputLine.indexOf("CURRENTSTATES");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1)
+          {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+                                                              .getBytes());
+            // System.out.println("record=" + record);
+            for (String partition : record.getMapFields().keySet())
+            {
+              Map<String, String> stateMap = record.getMapFields().get(partition);
+              String path = getAttributeValue(inputLine, "path:");
+              if (path != null)
+              {
+                String instance = HelixUtil.getInstanceNameFromPath(path);
+                formatter(csBw,
+                          timestamp,
+                          record.getId(),
+                          partition,
+                          instance,
+                          record.getSimpleField("SESSION_ID"),
+                          stateMap.get("CURRENT_STATE"));
+              }
+            }
+          }
+        }
+        else if (inputLine.indexOf("MESSAGES") != -1)
+        {
+          pos = inputLine.indexOf("MESSAGES");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1)
+          {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+                                                              .getBytes());
+
+            formatter(msgBw,
+                      timestamp,
+                      record.getSimpleField("RESOURCE_NAME"),
+                      record.getSimpleField("PARTITION_NAME"),
+                      record.getSimpleField("TGT_NAME"),
+                      record.getSimpleField("TGT_SESSION_ID"),
+                      record.getSimpleField("FROM_STATE"),
+                      record.getSimpleField("TO_STATE"),
+                      record.getSimpleField("MSG_TYPE"),
+                      record.getSimpleField("MSG_STATE"));
+          }
+
+        }
+        else if (inputLine.indexOf("closeSession") != -1)
+        {
+          String zkSessionId = getAttributeValue(inputLine, "session:");
+          if (zkSessionId == null)
+          {
+            System.err.println("no zk session id associated with the closing of zk session: "
+                + inputLine);
+          }
+          else
+          {
+            ZNRecord record = liveInstanceSessionMap.remove(zkSessionId);
+            // System.err.println("zkSessionId:" + zkSessionId + ", record:" + record);
+            if (record != null)
+            {
+              String timestamp = getAttributeValue(inputLine, "time:");
+              formatter(liBw,
+                        timestamp,
+                        record.getId(),
+                        record.getSimpleField("SESSION_ID"),
+                        "DELETE");
+            }
+          }
+        }
+        else if (inputLine.indexOf("HEALTHREPORT/defaultPerfCounters") != -1)
+        {
+          pos = inputLine.indexOf("HEALTHREPORT/defaultPerfCounters");
+          pos = inputLine.indexOf("data:{", pos);
+          if (pos != -1)
+          {
+            String timestamp = getAttributeValue(inputLine, "time:");
+            ZNRecord record =
+                (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
+                                                              .getBytes());
+
+            String path = getAttributeValue(inputLine, "path:");
+            if (path != null)
+            {
+              String instance = HelixUtil.getInstanceNameFromPath(path);
+              formatter(hrPerfBw,
+                        timestamp,
+                        instance,
+                        record.getSimpleField("availableCPUs"),
+                        record.getSimpleField("averageSystemLoad"),
+                        record.getSimpleField("freeJvmMemory"),
+                        record.getSimpleField("freePhysicalMemory"),
+                        record.getSimpleField("totalJvmMemory"));
+            }
+          }
+        }
+      }
+
+      br.close();
+      isBw.close();
+      cfgBw.close();
+      evBw.close();
+      smdCntBw.close();
+      smdNextBw.close();
+      csBw.close();
+      msgBw.close();
+      liBw.close();
+      hrPerfBw.close();
+    }
+    catch (Exception e)
+    {
+      System.err.println("Error: " + e.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ZnodeOpArg.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZnodeOpArg.java b/helix-core/src/main/java/org/apache/helix/tools/ZnodeOpArg.java
new file mode 100644
index 0000000..c492ad2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZnodeOpArg.java
@@ -0,0 +1,181 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.tools.TestExecutor.ZnodePropertyType;
+
+
+public class ZnodeOpArg
+{
+  public String _znodePath;
+  public ZnodePropertyType _propertyType;
+  
+  /**
+   *  "+" for update/create if not exist
+   *  '-' for remove
+   * "==" for test equals
+   * "!=" for test not equal
+   */ 
+  public String _operation;
+  public String _key;
+  public ZnodeValue _updateValue;
+  
+  public ZnodeOpArg()
+  {
+  }
+  
+  /**
+   * verify simple/list/map field: no update value
+   * @param znodePath
+   * @param type
+   * @param op
+   * @param key
+   */
+  public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op, String key)
+  {
+    this(znodePath, type, op, key, new ZnodeValue());
+  }
+  
+  /**
+   * verify znode: no update value
+   * @param znodePath
+   * @param type
+   * @param op
+   */
+  public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op)
+  {
+    this(znodePath, type, op, null, new ZnodeValue());
+  }
+  
+  /**
+   * simple field change
+   * @param znodePath
+   * @param type
+   * @param op
+   * @param key
+   * @param update
+   */
+  public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op, String key, String update)
+  {
+    this(znodePath, type, op, key, new ZnodeValue(update));
+  }
+
+  /**
+   * list field change
+   * @param znodePath
+   * @param type
+   * @param op
+   * @param key
+   * @param update
+   */
+  public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op, String key, List<String> update)
+  {
+    this(znodePath, type, op, key, new ZnodeValue(update));
+  }
+
+  /**
+   * map field change
+   * @param znodePath
+   * @param type
+   * @param op
+   * @param key
+   * @param update
+   */
+  public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op, String key, Map<String, String> update)
+  {
+    this(znodePath, type, op, key, new ZnodeValue(update));
+  }
+
+  /**
+   * znode change
+   * @param znodePath
+   * @param type
+   * @param op
+   * @param key
+   * @param update
+   */
+  public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op, ZNRecord update)
+  {
+    this(znodePath, type, op, null, new ZnodeValue(update));
+  }
+
+  /**
+   * 
+   * @param znodePath
+   * @param type
+   * @param op
+   * @param key
+   * @param update
+   */
+  public ZnodeOpArg(String znodePath, ZnodePropertyType type, String op, String key, ZnodeValue update)
+  {
+    _znodePath = znodePath;
+    _propertyType = type;
+    _operation = op;
+    _key = key;
+    _updateValue = update;
+  }
+  
+  @Override
+  public String toString()
+  {
+    String ret = "={\"" + 
+                 _znodePath + "\", " + _propertyType + "/" + _key + " " + _operation + " " +
+                 _updateValue + "}";
+    return ret;
+  }
+
+  
+  // TODO temp test; remove it
+  /*
+  public static void main(String[] args) 
+  {
+    // null modification command
+    ZnodeOpArg command = new ZnodeOpArg();
+    System.out.println(command);
+    
+    // simple modification command
+    command = new ZnodeOpArg("/testPath", ZnodePropertyType.SIMPLE, "+", "key1", "simpleValue1");
+    System.out.println(command);
+    
+    // list modification command
+    List<String> list = new ArrayList<String>();
+    list.add("listValue1");
+    list.add("listValue2");
+    command = new ZnodeOpArg("/testPath", ZnodePropertyType.LIST, "+", "key1", list);
+    System.out.println(command);
+    
+    // map modification command
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("mapKey1", "mapValue1");
+    map.put("mapKey2", "mapValue2");
+    command = new ZnodeOpArg("/testPath", ZnodePropertyType.MAP, "+", "key1", map);
+    System.out.println(command);
+
+    // map modification command
+    ZNRecord record = new ZNRecord("znrecord");
+    record.setSimpleField("key1", "simpleValue1");
+    record.setListField("key1", list);
+    record.setMapField("key1", map);
+    command = new ZnodeOpArg("/testPath", ZnodePropertyType.ZNODE, "+", record);
+    System.out.println(command);
+  }
+  */
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ZnodeValue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZnodeValue.java b/helix-core/src/main/java/org/apache/helix/tools/ZnodeValue.java
new file mode 100644
index 0000000..21fb271
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZnodeValue.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+
+
+public class ZnodeValue
+{
+  public String _singleValue;
+  public List<String> _listValue;
+  public Map<String, String> _mapValue;
+  public ZNRecord _znodeValue;
+  
+  public ZnodeValue()
+  {
+  }
+  
+  public ZnodeValue(String value)
+  {
+    _singleValue = value;
+  }
+  
+  public ZnodeValue(List<String> value)
+  {
+    _listValue = value;
+  }
+  
+  public ZnodeValue(Map<String, String> value)
+  {
+    _mapValue = value;
+  }
+  
+  public ZnodeValue(ZNRecord value)
+  {
+    _znodeValue = value;
+  }
+  
+  @Override
+  public String toString()
+  {
+    if (_singleValue != null)
+    {
+      return _singleValue;
+    }
+    else if (_listValue != null)
+    {
+      return _listValue.toString();
+    }
+    else if (_mapValue != null)
+    {
+      return _mapValue.toString();
+    }
+    else if (_znodeValue != null)
+    {
+      return _znodeValue.toString();
+    }
+    
+    return "null";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/package-info.java b/helix-core/src/main/java/org/apache/helix/tools/package-info.java
new file mode 100644
index 0000000..ccf303c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix tools classes
+ * 
+ */
+package org.apache.helix.tools;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
new file mode 100644
index 0000000..62c858f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -0,0 +1,181 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.util;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+
+public final class HelixUtil
+{
+  private HelixUtil()
+  {
+  }
+
+  public static String getPropertyPath(String clusterName, PropertyType type)
+  {
+    return "/" + clusterName + "/" + type.toString();
+  }
+
+  public static String getInstancePropertyPath(String clusterName, String instanceName,
+      PropertyType type)
+  {
+    return getPropertyPath(clusterName, PropertyType.INSTANCES) + "/" + instanceName + "/"
+        + type.toString();
+  }
+
+  public static String getInstancePath(String clusterName, String instanceName)
+  {
+    return getPropertyPath(clusterName, PropertyType.INSTANCES) + "/" + instanceName;
+  }
+
+  public static String getIdealStatePath(String clusterName, String resourceName)
+  {
+    return getPropertyPath(clusterName, PropertyType.IDEALSTATES) + "/" + resourceName;
+  }
+
+  public static String getIdealStatePath(String clusterName)
+  {
+    return getPropertyPath(clusterName, PropertyType.IDEALSTATES);
+  }
+
+  public static String getLiveInstancesPath(String clusterName)
+  {
+    return getPropertyPath(clusterName, PropertyType.LIVEINSTANCES);
+  }
+
+  // public static String getConfigPath(String clusterName)
+  // {
+  // return getPropertyPath(clusterName, PropertyType.PARTICIPANT_CONFIGS);
+  // }
+
+  // public static String getConfigPath(String clusterName, String instanceName)
+  // {
+  // return getConfigPath(clusterName) + "/" + instanceName;
+  // }
+
+  public static String getMessagePath(String clusterName, String instanceName)
+  {
+    return getInstancePropertyPath(clusterName, instanceName, PropertyType.MESSAGES);
+  }
+
+  public static String getCurrentStateBasePath(String clusterName, String instanceName)
+  {
+    return getInstancePropertyPath(clusterName, instanceName, PropertyType.CURRENTSTATES);
+  }
+
+  /**
+   * Even though this is simple we want to have the mechanism of bucketing the
+   * partitions. If we have P partitions and N nodes with K replication factor
+   * and D databases. Then on each node we will have (P/N)*K*D partitions. And
+   * cluster manager neeeds to maintain watch on each of these nodes for every
+   * node. So over all cluster manager will have P*K*D watches which can be
+   * quite large given that we over partition.
+   * 
+   * The other extreme is having one znode per storage per database. This will
+   * result in N*D watches which is good. But data in every node might become
+   * really big since it has to save partition
+   * 
+   * Ideally we want to balance between the two models
+   * 
+   */
+  public static String getCurrentStatePath(String clusterName, String instanceName,
+      String sessionId, String stateUnitKey)
+  {
+    return getInstancePropertyPath(clusterName, instanceName, PropertyType.CURRENTSTATES) + "/"
+        + sessionId + "/" + stateUnitKey;
+  }
+
+  public static String getExternalViewPath(String clusterName)
+  {
+    return getPropertyPath(clusterName, PropertyType.EXTERNALVIEW);
+  }
+
+  public static String getStateModelDefinitionPath(String clusterName)
+  {
+    return getPropertyPath(clusterName, PropertyType.STATEMODELDEFS);
+  }
+
+  public static String getExternalViewPath(String clusterName, String resourceName)
+  {
+    return getPropertyPath(clusterName, PropertyType.EXTERNALVIEW) + "/" + resourceName;
+  }
+
+  public static String getLiveInstancePath(String clusterName, String instanceName)
+  {
+    return getPropertyPath(clusterName, PropertyType.LIVEINSTANCES) + "/" + instanceName;
+  }
+
+  public static String getMemberInstancesPath(String clusterName)
+  {
+    return getPropertyPath(clusterName, PropertyType.INSTANCES);
+  }
+
+  public static String getErrorsPath(String clusterName, String instanceName)
+  {
+    return getInstancePropertyPath(clusterName, instanceName, PropertyType.ERRORS);
+  }
+
+  public static String getStatusUpdatesPath(String clusterName, String instanceName)
+  {
+    return getInstancePropertyPath(clusterName, instanceName, PropertyType.STATUSUPDATES);
+  }
+
+  public static String getHealthPath(String clusterName, String instanceName)
+  {
+    return PropertyPathConfig.getPath(PropertyType.HEALTHREPORT, clusterName, instanceName);
+  }
+
+  public static String getPersistentStatsPath(String clusterName)
+  {
+    return PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
+  }
+
+  public static String getAlertsPath(String clusterName)
+  {
+    return PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
+  }
+
+  public static String getAlertStatusPath(String clusterName)
+  {
+    return PropertyPathConfig.getPath(PropertyType.ALERT_STATUS, clusterName);
+  }
+
+  public static String getInstanceNameFromPath(String path)
+  {
+    // path structure
+    // /<cluster_name>/instances/<instance_name>/[currentStates/messages]
+    if (path.contains("/" + PropertyType.INSTANCES + "/"))
+    {
+      String[] split = path.split("\\/");
+      if (split.length > 3)
+      {
+        return split[3];
+      }
+    }
+    return null;
+  }
+
+  // distributed cluster controller
+  public static String getControllerPath(String clusterName)
+  {
+    return getPropertyPath(clusterName, PropertyType.CONTROLLER);
+  }
+
+  public static String getControllerPropertyPath(String clusterName, PropertyType type)
+  {
+    return PropertyPathConfig.getPath(type, clusterName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
new file mode 100644
index 0000000..d3060aa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
@@ -0,0 +1,629 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.util;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.Error;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StatusUpdate;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Util class to create statusUpdates ZK records and error ZK records. These message
+ * records are for diagnostics only, and they are stored on the "StatusUpdates" and
+ * "errors" ZNodes in the zookeeper instances.
+ * 
+ * 
+ * */
+public class StatusUpdateUtil
+{
+  static Logger _logger = Logger.getLogger(StatusUpdateUtil.class);
+
+  public static class Transition implements Comparable<Transition>
+  {
+    private final String _msgID;
+    private final long   _timeStamp;
+    private final String _from;
+    private final String _to;
+
+    public Transition(String msgID, long timeStamp, String from, String to)
+    {
+      this._msgID = msgID;
+      this._timeStamp = timeStamp;
+      this._from = from;
+      this._to = to;
+    }
+
+    @Override
+    public int compareTo(Transition t)
+    {
+      if (_timeStamp < t._timeStamp)
+        return -1;
+      else if (_timeStamp > t._timeStamp)
+        return 1;
+      else
+        return 0;
+    }
+
+    public boolean equals(Transition t)
+    {
+      return (_timeStamp == t._timeStamp && _from.equals(t._from) && _to.equals(t._to));
+    }
+
+    public String getFromState()
+    {
+      return _from;
+    }
+
+    public String getToState()
+    {
+      return _to;
+    }
+
+    public String getMsgID()
+    {
+      return _msgID;
+    }
+
+    @Override
+    public String toString()
+    {
+      return _msgID + ":" + _timeStamp + ":" + _from + "->" + _to;
+    }
+  }
+
+  public static enum TaskStatus
+  {
+    UNKNOWN, NEW, SCHEDULED, INVOKING, COMPLETED, FAILED
+  }
+
+  public static class StatusUpdateContents
+  {
+    private final List<Transition>        _transitions;
+    private final Map<String, TaskStatus> _taskMessages;
+
+    private StatusUpdateContents(List<Transition> transitions,
+                                 Map<String, TaskStatus> taskMessages)
+    {
+      this._transitions = transitions;
+      this._taskMessages = taskMessages;
+    }
+
+    public static StatusUpdateContents getStatusUpdateContents(HelixDataAccessor accessor,
+                                                               String instance,
+                                                               String resourceGroup,
+                                                               String partition)
+    {
+      return getStatusUpdateContents(accessor, instance, resourceGroup, null, partition);
+    }
+
+    // TODO: We should build a map and return the key instead of searching
+    // everytime
+    // for an (instance, resourceGroup, session, partition) tuple.
+    // But such a map is very similar to what exists in ZNRecord
+    // passing null for sessionID results in searching across all sessions
+    public static StatusUpdateContents getStatusUpdateContents(HelixDataAccessor accessor,
+                                                               String instance,
+                                                               String resourceGroup,
+                                                               String sessionID,
+                                                               String partition)
+    {
+      Builder keyBuilder = accessor.keyBuilder();
+
+      List<ZNRecord> instances =
+          HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs()));
+      List<ZNRecord> partitionRecords = new ArrayList<ZNRecord>();
+      for (ZNRecord znRecord : instances)
+      {
+        String instanceName = znRecord.getId();
+        if (!instanceName.equals(instance))
+        {
+          continue;
+        }
+
+        List<String> sessions = accessor.getChildNames(keyBuilder.sessions(instanceName));
+        for (String session : sessions)
+        {
+          if (sessionID != null && !session.equals(sessionID))
+          {
+            continue;
+          }
+
+          List<String> resourceGroups =
+              accessor.getChildNames(keyBuilder.stateTransitionStatus(instanceName,
+                                                                      session));
+          for (String resourceGroupName : resourceGroups)
+          {
+            if (!resourceGroupName.equals(resourceGroup))
+            {
+              continue;
+            }
+
+            List<String> partitionStrings =
+                accessor.getChildNames(keyBuilder.stateTransitionStatus(instanceName,
+                                                                        session,
+                                                                        resourceGroupName));
+
+            for (String partitionString : partitionStrings)
+            {
+              ZNRecord partitionRecord =
+                  accessor.getProperty(keyBuilder.stateTransitionStatus(instanceName,
+                                                                        session,
+                                                                        resourceGroupName,
+                                                                        partitionString))
+                          .getRecord();
+              if (!partitionString.equals(partition))
+              {
+                continue;
+              }
+              partitionRecords.add(partitionRecord);
+            }
+          }
+        }
+      }
+
+      return new StatusUpdateContents(getSortedTransitions(partitionRecords),
+                                      getTaskMessages(partitionRecords));
+    }
+
+    public List<Transition> getTransitions()
+    {
+      return _transitions;
+    }
+
+    public Map<String, TaskStatus> getTaskMessages()
+    {
+      return _taskMessages;
+    }
+
+    // input: List<ZNRecord> corresponding to (instance, database,
+    // partition) tuples across all sessions
+    // return list of transitions sorted from earliest to latest
+    private static List<Transition> getSortedTransitions(List<ZNRecord> partitionRecords)
+    {
+      List<Transition> transitions = new ArrayList<Transition>();
+      for (ZNRecord partition : partitionRecords)
+      {
+        Map<String, Map<String, String>> mapFields = partition.getMapFields();
+        for (String key : mapFields.keySet())
+        {
+          if (key.startsWith("MESSAGE"))
+          {
+            Map<String, String> m = mapFields.get(key);
+            long createTimeStamp = 0;
+            try
+            {
+              createTimeStamp = Long.parseLong(m.get("CREATE_TIMESTAMP"));
+            }
+            catch (Exception e)
+            {
+            }
+            transitions.add(new Transition(m.get("MSG_ID"),
+                                           createTimeStamp,
+                                           m.get("FROM_STATE"),
+                                           m.get("TO_STATE")));
+          }
+        }
+      }
+      Collections.sort(transitions);
+      return transitions;
+    }
+
+    private static Map<String, TaskStatus> getTaskMessages(List<ZNRecord> partitionRecords)
+    {
+      Map<String, TaskStatus> taskMessages = new HashMap<String, TaskStatus>();
+      for (ZNRecord partition : partitionRecords)
+      {
+        Map<String, Map<String, String>> mapFields = partition.getMapFields();
+        // iterate over the task status updates in the order they occurred
+        // so that the last status can be recorded
+        for (String key : mapFields.keySet())
+        {
+          if (key.contains("STATE_TRANSITION"))
+          {
+            Map<String, String> m = mapFields.get(key);
+            String id = m.get("MSG_ID");
+            String statusString = m.get("AdditionalInfo");
+            TaskStatus status = TaskStatus.UNKNOWN;
+            if (statusString.contains("scheduled"))
+              status = TaskStatus.SCHEDULED;
+            else if (statusString.contains("invoking"))
+              status = TaskStatus.INVOKING;
+            else if (statusString.contains("completed"))
+              status = TaskStatus.COMPLETED;
+
+            taskMessages.put(id, status);
+          }
+        }
+      }
+      return taskMessages;
+    }
+  }
+
+  public enum Level
+  {
+    HELIX_ERROR, HELIX_WARNING, HELIX_INFO
+  }
+
+  /**
+   * Creates an empty ZNRecord as the statusUpdate/error record
+   * 
+   * @param id
+   */
+  public ZNRecord createEmptyStatusUpdateRecord(String id)
+  {
+    return new ZNRecord(id);
+  }
+
+  /**
+   * Create a ZNRecord for a message, which stores the content of the message (stored in
+   * simple fields) into the ZNRecord mapFields. In this way, the message update can be
+   * merged with the previous status update record in the zookeeper. See ZNRecord.merge()
+   * for more details.
+   * */
+  ZNRecord createMessageLogRecord(Message message)
+  {
+    ZNRecord result = new ZNRecord(getStatusUpdateRecordName(message));
+    String mapFieldKey = "MESSAGE " + message.getMsgId();
+    result.setMapField(mapFieldKey, new TreeMap<String, String>());
+
+    // Store all the simple fields of the message in the new ZNRecord's map
+    // field.
+    for (String simpleFieldKey : message.getRecord().getSimpleFields().keySet())
+    {
+      result.getMapField(mapFieldKey).put(simpleFieldKey,
+                                          message.getRecord()
+                                                 .getSimpleField(simpleFieldKey));
+    }
+    if (message.getResultMap() != null)
+    {
+      result.setMapField("MessageResult", message.getResultMap());
+    }
+    return result;
+  }
+
+  Map<String, String> _recordedMessages = new ConcurrentHashMap<String, String>();
+
+  /**
+   * Create a statusupdate that is related to a cluster manager message.
+   * 
+   * @param message
+   *          the related cluster manager message
+   * @param level
+   *          the error level
+   * @param classInfo
+   *          class info about the class that reports the status update
+   * @param additional
+   *          info the additional debug information
+   */
+  public ZNRecord createMessageStatusUpdateRecord(Message message,
+                                                  Level level,
+                                                  Class classInfo,
+                                                  String additionalInfo)
+  {
+    ZNRecord result = createEmptyStatusUpdateRecord(getStatusUpdateRecordName(message));
+    Map<String, String> contentMap = new TreeMap<String, String>();
+
+    contentMap.put("Message state", message.getMsgState().toString());
+    contentMap.put("AdditionalInfo", additionalInfo);
+    contentMap.put("Class", classInfo.toString());
+    contentMap.put("MSG_ID", message.getMsgId());
+
+    DateFormat formatter = new SimpleDateFormat("yyyyMMdd-HHmmss.SSSSSS");
+    String time = formatter.format(new Date());
+
+    String id =
+        String.format("%4s %26s ", level.toString(), time)
+            + getRecordIdForMessage(message);
+
+    result.setMapField(id, contentMap);
+
+    return result;
+  }
+
+  String getRecordIdForMessage(Message message)
+  {
+    if (message.getMsgType().equals(MessageType.STATE_TRANSITION))
+    {
+      return message.getPartitionName() + " Trans:" + message.getFromState().charAt(0)
+          + "->" + message.getToState().charAt(0) + "  " + UUID.randomUUID().toString();
+    }
+    else
+    {
+      return message.getMsgType() + " " + UUID.randomUUID().toString();
+    }
+  }
+
+  /**
+   * Create a statusupdate that is related to a cluster manager message, then record it to
+   * the zookeeper store.
+   * 
+   * @param message
+   *          the related cluster manager message
+   * @param level
+   *          the error level
+   * @param classInfo
+   *          class info about the class that reports the status update
+   * @param additional
+   *          info the additional debug information
+   * @param accessor
+   *          the zookeeper data accessor that writes the status update to zookeeper
+   */
+  public void logMessageStatusUpdateRecord(Message message,
+                                           Level level,
+                                           Class classInfo,
+                                           String additionalInfo,
+                                           HelixDataAccessor accessor)
+  {
+    try
+    {
+      ZNRecord record =
+          createMessageStatusUpdateRecord(message, level, classInfo, additionalInfo);
+      publishStatusUpdateRecord(record, message, level, accessor);
+    }
+    catch (Exception e)
+    {
+      _logger.error("Exception while logging status update", e);
+    }
+  }
+
+  public void logError(Message message,
+                       Class classInfo,
+                       String additionalInfo,
+                       HelixDataAccessor accessor)
+  {
+    logMessageStatusUpdateRecord(message,
+                                 Level.HELIX_ERROR,
+                                 classInfo,
+                                 additionalInfo,
+                                 accessor);
+  }
+
+  public void logError(Message message,
+                       Class classInfo,
+                       Exception e,
+                       String additionalInfo,
+                       HelixDataAccessor accessor)
+  {
+    StringWriter sw = new StringWriter();
+    PrintWriter pw = new PrintWriter(sw);
+    e.printStackTrace(pw);
+    logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo, additionalInfo
+        + sw.toString(), accessor);
+  }
+
+  public void logInfo(Message message,
+                      Class classInfo,
+                      String additionalInfo,
+                      HelixDataAccessor accessor)
+  {
+    logMessageStatusUpdateRecord(message,
+                                 Level.HELIX_INFO,
+                                 classInfo,
+                                 additionalInfo,
+                                 accessor);
+  }
+
+  public void logWarning(Message message,
+                         Class classInfo,
+                         String additionalInfo,
+                         HelixDataAccessor accessor)
+  {
+    logMessageStatusUpdateRecord(message,
+                                 Level.HELIX_WARNING,
+                                 classInfo,
+                                 additionalInfo,
+                                 accessor);
+  }
+
+  /**
+   * Write a status update record to zookeeper to the zookeeper store.
+   * 
+   * @param record
+   *          the status update record
+   * @param message
+   *          the message to be logged
+   * @param level
+   *          the error level of the message update
+   * @param accessor
+   *          the zookeeper data accessor that writes the status update to zookeeper
+   */
+  void publishStatusUpdateRecord(ZNRecord record,
+                                 Message message,
+                                 Level level,
+                                 HelixDataAccessor accessor)
+  {
+    String instanceName = message.getTgtName();
+    String statusUpdateSubPath = getStatusUpdateSubPath(message);
+    String statusUpdateKey = getStatusUpdateKey(message);
+    String sessionId = message.getExecutionSessionId();
+    if (sessionId == null)
+    {
+      sessionId = message.getTgtSessionId();
+    }
+    if (sessionId == null)
+    {
+      sessionId = "*";
+    }
+
+    Builder keyBuilder = accessor.keyBuilder();
+    if (!_recordedMessages.containsKey(message.getMsgId()))
+    {
+      // TODO instanceName of a controller might be any string
+      if (instanceName.equalsIgnoreCase("Controller"))
+      {
+        accessor.updateProperty(keyBuilder.controllerTaskStatus(statusUpdateSubPath,
+                                                                statusUpdateKey),
+                                new StatusUpdate(createMessageLogRecord(message)));
+
+      }
+      else
+      {
+        
+        PropertyKey propertyKey =
+            keyBuilder.stateTransitionStatus(instanceName,
+                                             sessionId,
+                                             statusUpdateSubPath,
+                                             statusUpdateKey);
+
+        ZNRecord statusUpdateRecord = createMessageLogRecord(message);
+
+        // For now write participant StatusUpdates to log4j. 
+        // we are using restlet as another data channel to report to controller.
+        
+        _logger.info("StatusUpdate path:" + propertyKey.getPath() + ", updates:"
+              + statusUpdateRecord);
+        accessor.updateProperty(propertyKey, new StatusUpdate(statusUpdateRecord));
+        
+      }
+      _recordedMessages.put(message.getMsgId(), message.getMsgId());
+    }
+
+    if (instanceName.equalsIgnoreCase("Controller"))
+    {
+      accessor.updateProperty(keyBuilder.controllerTaskStatus(statusUpdateSubPath,
+                                                              statusUpdateKey),
+                              new StatusUpdate(record));
+    }
+    else
+    {
+      
+      PropertyKey propertyKey =
+          keyBuilder.stateTransitionStatus(instanceName,
+                                           sessionId,
+                                           statusUpdateSubPath,
+                                           statusUpdateKey);
+      // For now write participant StatusUpdates to log4j. 
+      // we are using restlet as another data channel to report to controller.
+      _logger.info("StatusUpdate path:" + propertyKey.getPath() + ", updates:" + record);
+      accessor.updateProperty(propertyKey, new StatusUpdate(record));
+    }
+
+    // If the error level is ERROR, also write the record to "ERROR" ZNode
+    if (Level.HELIX_ERROR == level)
+    {
+      publishErrorRecord(record, message, accessor);
+    }
+  }
+
+  private String getStatusUpdateKey(Message message)
+  {
+    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString()))
+    {
+      return message.getPartitionName();
+    }
+    return message.getMsgId();
+  }
+
+  /**
+   * Generate the sub-path under STATUSUPDATE or ERROR path for a status update
+   * 
+   */
+  String getStatusUpdateSubPath(Message message)
+  {
+    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString()))
+    {
+      return message.getResourceName();
+    }
+    else
+    {
+      return message.getMsgType();
+    }
+  }
+
+  String getStatusUpdateRecordName(Message message)
+  {
+    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString()))
+    {
+      return message.getTgtSessionId() + "__" + message.getResourceName();
+    }
+    return message.getMsgId();
+  }
+
+  /**
+   * Write an error record to zookeeper to the zookeeper store.
+   * 
+   * @param record
+   *          the status update record
+   * @param message
+   *          the message to be logged
+   * @param accessor
+   *          the zookeeper data accessor that writes the status update to zookeeper
+   */
+  void publishErrorRecord(ZNRecord record, Message message, HelixDataAccessor accessor)
+  {
+    String instanceName = message.getTgtName();
+    String statusUpdateSubPath = getStatusUpdateSubPath(message);
+    String statusUpdateKey = getStatusUpdateKey(message);
+    String sessionId = message.getExecutionSessionId();
+    if (sessionId == null)
+    {
+      sessionId = message.getTgtSessionId();
+    }
+    if (sessionId == null)
+    {
+      sessionId = "*";
+    }
+
+    Builder keyBuilder = accessor.keyBuilder();
+
+    // TODO remove the hard code: "controller"
+    if (instanceName.equalsIgnoreCase("controller"))
+    {
+      // TODO need to fix: ERRORS_CONTROLLER doesn't have a form of
+      // ../{sessionId}/{subPath}
+      // accessor.setProperty(PropertyType.ERRORS_CONTROLLER, record,
+      // statusUpdateSubPath);
+      accessor.setProperty(keyBuilder.controllerTaskError(statusUpdateSubPath),
+                           new Error(record));
+    }
+    else
+    {
+      // accessor.updateProperty(PropertyType.ERRORS,
+      // record,
+      // instanceName,
+      // sessionId,
+      // statusUpdateSubPath,
+      // statusUpdateKey);
+      accessor.updateProperty(keyBuilder.stateTransitionError(instanceName,
+                                                              sessionId,
+                                                              statusUpdateSubPath,
+                                                              statusUpdateKey),
+                              new Error(record));
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/util/StringTemplate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/StringTemplate.java b/helix-core/src/main/java/org/apache/helix/util/StringTemplate.java
new file mode 100644
index 0000000..7ceef93
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/StringTemplate.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.util;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+public class StringTemplate
+{
+  private static Logger LOG = Logger.getLogger(StringTemplate.class);
+
+  Map<Enum, Map<Integer, String>> templateMap = new HashMap<Enum, Map<Integer, String>>();
+  static Pattern pattern = Pattern.compile("(\\{.+?\\})");
+
+  public void addEntry(Enum type, int numKeys, String template)
+  {
+    if (!templateMap.containsKey(type))
+    {
+      templateMap.put(type, new HashMap<Integer, String>());
+    }
+    LOG.trace("Add template for type: " + type.name() + ", arguments: " + numKeys
+        + ", template: " + template);
+    templateMap.get(type).put(numKeys, template);
+  }
+
+  public String instantiate(Enum type, String... keys)
+  {
+    if (keys == null)
+    {
+      keys = new String[] {};
+    }
+
+    String template = null;
+    if (templateMap.containsKey(type))
+    {
+      template = templateMap.get(type).get(keys.length);
+    }
+
+    String result = null;
+
+    if (template != null)
+    {
+      result = template;
+      Matcher matcher = pattern.matcher(template);
+      int count = 0;
+      while (matcher.find())
+      {
+        String var = matcher.group();
+        result = result.replace(var, keys[count]);
+        count++;
+      }
+    }
+
+    if (result == null || result.indexOf('{') > -1 || result.indexOf('}') > -1)
+    {
+      String errMsg = "Unable to instantiate template: " + template
+          + " using keys: " + Arrays.toString(keys);
+      LOG.error(errMsg);
+      throw new IllegalArgumentException(errMsg);
+    }
+
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
new file mode 100644
index 0000000..ea1368f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.util;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.zookeeper.ZooKeeper.States;
+
+
+public class ZKClientPool
+{
+  static final Map<String, ZkClient> _zkClientMap = new ConcurrentHashMap<String, ZkClient>();
+  static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
+
+  public static ZkClient getZkClient(String zkServer)
+  {
+    // happy path that we cache the zkclient and it's still connected
+    if (_zkClientMap.containsKey(zkServer))
+    {
+      ZkClient zkClient = _zkClientMap.get(zkServer);
+      if (zkClient.getConnection().getZookeeperState() == States.CONNECTED)
+      {
+        return zkClient;
+      }
+    }
+
+    synchronized (_zkClientMap)
+    {
+      // if we cache a stale zkclient, purge it
+      if (_zkClientMap.containsKey(zkServer))
+      {
+        ZkClient zkClient = _zkClientMap.get(zkServer);
+        if (zkClient.getConnection().getZookeeperState() != States.CONNECTED)
+        {
+          _zkClientMap.remove(zkServer);
+        }
+      }
+
+      // get a new zkclient
+      if (!_zkClientMap.containsKey(zkServer))
+      {
+        ZkClient zkClient = new ZkClient(zkServer, DEFAULT_SESSION_TIMEOUT, 
+            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+
+        _zkClientMap.put(zkServer, zkClient);
+      }
+      return _zkClientMap.get(zkServer);
+    }
+  }
+
+  public static void reset()
+  {
+    _zkClientMap.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
new file mode 100644
index 0000000..bc8bc11
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.util;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+
+
+//TODO find a proper place for these methods
+public final class ZNRecordUtil
+{
+  private static final Logger logger = Logger.getLogger(ZNRecordUtil.class.getName());
+
+  private ZNRecordUtil()
+  {
+  }
+
+  public static ZNRecord find(String id, List<ZNRecord> list)
+  {
+    for (ZNRecord record : list)
+    {
+      if (record.getId() != null && record.getId().equals(id))
+      {
+        return record;
+      }
+    }
+    return null;
+  }
+
+  public static Map<String, ZNRecord> convertListToMap(List<ZNRecord> recordList)
+  {
+    Map<String, ZNRecord> recordMap = new HashMap<String, ZNRecord>();
+    for (ZNRecord record : recordList)
+    {
+      if (record.getId() != null)
+      {
+        recordMap.put(record.getId(), record);
+      }
+    }
+    return recordMap;
+  }
+
+  public static <T extends Object> List<T> convertListToTypedList(List<ZNRecord> recordList,
+                                                                  Class<T> clazz)
+  {
+    List<T> list = new ArrayList<T>();
+    for (ZNRecord record : recordList)
+    {
+      if (record.getId() == null)
+      {
+        logger.error("Invalid record: Id missing in " + record);
+        continue;
+      }
+      try
+      {
+
+        Constructor<T> constructor = clazz.getConstructor(new Class[] { ZNRecord.class });
+        T instance = constructor.newInstance(record);
+        list.add(instance);
+      }
+      catch (Exception e)
+      {
+        logger.error("Error creating an Object of type:" + clazz.getCanonicalName(), e);
+      }
+    }
+    return list;
+  }
+
+  public static <T extends Object> Map<String, T> convertListToTypedMap(List<ZNRecord> recordList,
+                                                                        Class<T> clazz)
+  {
+    Map<String, T> map = new HashMap<String, T>();
+    for (ZNRecord record : recordList)
+    {
+      if (record.getId() == null)
+      {
+        logger.error("Invalid record: Id missing in " + record);
+        continue;
+      }
+      try
+      {
+
+        Constructor<T> constructor = clazz.getConstructor(new Class[] { ZNRecord.class });
+        T instance = constructor.newInstance(record);
+        map.put(record.getId(), instance);
+      }
+      catch (Exception e)
+      {
+        logger.error("Error creating an Object of type:" + clazz.getCanonicalName(), e);
+      }
+    }
+    return map;
+  }
+
+  public static <T extends Object> List<T> convertMapToList(Map<String, T> map)
+  {
+    List<T> list = new ArrayList<T>();
+    for (T t : map.values())
+    {
+      list.add(t);
+    }
+    return list;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/util/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/package-info.java b/helix-core/src/main/java/org/apache/helix/util/package-info.java
new file mode 100644
index 0000000..eb14cb3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix utility classes
+ * 
+ */
+package org.apache.helix.util;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/AppTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/AppTest.java b/helix-core/src/test/java/org/apache/helix/AppTest.java
new file mode 100644
index 0000000..80961c6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/AppTest.java
@@ -0,0 +1,189 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
+import org.apache.helix.model.Message;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+{
+	/**
+	 * Create the test case
+	 * 
+	 * @param testName
+	 *          name of the test case
+	 */
+	public AppTest(String testName)
+	{
+	}
+
+	@Test(enabled = false)
+  private static void testChrootWithZkClient() throws Exception
+	{
+		ZkClient client = new ZkClient("localhost:2181/foo");
+		IZkStateListener stateChangeListener = new IZkStateListener()
+		{
+
+			@Override
+			public void handleStateChanged(KeeperState state) throws Exception
+			{
+				System.out
+				    .println("AppTest.main(...).new IZkStateListener() {...}.handleStateChanged()"
+				        + state);
+			}
+
+			@Override
+			public void handleNewSession() throws Exception
+			{
+				System.out
+				    .println("AppTest.main(...).new IZkStateListener() {...}.handleNewSession()");
+			}
+		};
+		client.subscribeStateChanges(stateChangeListener);
+		boolean waitUntilConnected = client.waitUntilConnected(10000,
+		    TimeUnit.MILLISECONDS);
+		System.out.println("Connected " + waitUntilConnected);
+		client.waitForKeeperState(KeeperState.Disconnected, 20000,
+		    TimeUnit.MILLISECONDS);
+		// server.start();
+		client.waitUntilConnected();
+		Thread.currentThread().join();
+	}
+
+	@Test(enabled = false)
+  private static void testChroot() throws Exception
+	{
+		Watcher watcher = new Watcher()
+		{
+			@Override
+			public void process(WatchedEvent event)
+			{
+				System.out.println("Event:" + event);
+			}
+		};
+		ZooKeeper zk = new ZooKeeper("localhost:2181/foo", 6000, watcher);
+		// uncommenting this line will not cause infinite connect/disconnect
+		// zk.create("/", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+		zk.exists("/", true);
+		System.out
+		    .println("Stop the server and restart it when you see this message");
+		Thread.currentThread().join();
+	}
+
+	@Test(enabled = false)
+  private static void testZKClient() throws InterruptedException
+	{
+		IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+		{
+			@Override
+			public void createDefaultNameSpace(ZkClient zkClient)
+			{
+			}
+		};
+		String dataDir = "/tmp/dataDir";
+		String logDir = "/tmp/logDir";
+		// ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace, 2181);
+		// server.start();
+
+		ZkClient client = new ZkClient("localhost:2181/foo");
+		IZkStateListener stateChangeListener = new IZkStateListener()
+		{
+
+			@Override
+			public void handleStateChanged(KeeperState state) throws Exception
+			{
+				System.out
+				    .println("AppTest.main(...).new IZkStateListener() {...}.handleStateChanged()"
+				        + state);
+			}
+
+			@Override
+			public void handleNewSession() throws Exception
+			{
+				System.out
+				    .println("AppTest.main(...).new IZkStateListener() {...}.handleNewSession()");
+			}
+		};
+		client.subscribeStateChanges(stateChangeListener);
+		boolean waitUntilConnected = client.waitUntilConnected(10000,
+		    TimeUnit.MILLISECONDS);
+		System.out.println("Connected " + waitUntilConnected);
+		IZkChildListener listener1 = new IZkChildListener()
+		{
+
+			@Override
+			public void handleChildChange(String parentPath,
+			    List<String> currentChilds) throws Exception
+			{
+				System.out.println("listener 1 Change at path:" + parentPath);
+			}
+		};
+		IZkChildListener listener2 = new IZkChildListener()
+		{
+			@Override
+			public void handleChildChange(String parentPath,
+			    List<String> currentChilds) throws Exception
+			{
+				System.out.println("listener2 Change at path:" + parentPath);
+			}
+		};
+
+		client.subscribeChildChanges("/", listener1);
+		client.subscribeChildChanges("/foo", listener2);
+
+		// server.shutdown();
+		client.waitForKeeperState(KeeperState.Disconnected, 20000,
+		    TimeUnit.MILLISECONDS);
+		// server.start();
+		client.waitUntilConnected();
+
+		Thread.sleep(1000);
+		client.setZkSerializer(new BytesPushThroughSerializer());
+		client.create("/test", new byte[0], CreateMode.EPHEMERAL);
+		Thread.sleep(1000);
+	}
+
+	public static void main(String[] args) throws Exception
+	{
+		//testChroot();
+		// testZKClient();
+		// testChrootWithZkClient();
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
new file mode 100644
index 0000000..8dda442
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.mock.storage.DummyProcess.DummyLeaderStandbyStateModelFactory;
+import org.apache.helix.mock.storage.DummyProcess.DummyOnlineOfflineStateModelFactory;
+import org.apache.helix.mock.storage.DummyProcess.DummyStateModelFactory;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+
+public class DummyProcessThread implements Runnable
+{
+  private static final Logger LOG = Logger.getLogger(DummyProcessThread.class);
+
+  HelixManager _manager;
+  String _instanceName;
+
+  public DummyProcessThread(HelixManager manager, String instanceName)
+  {
+    _manager = manager;
+    _instanceName = instanceName;
+  }
+
+  @Override
+  public void run()
+  {
+    try
+    {
+      DummyStateModelFactory stateModelFactory = new DummyStateModelFactory(0);
+//      StateMachineEngine genericStateMachineHandler =
+//          new StateMachineEngine();
+      StateMachineEngine stateMach = _manager.getStateMachineEngine();
+      stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+
+      DummyLeaderStandbyStateModelFactory stateModelFactory1 = new DummyLeaderStandbyStateModelFactory(10);
+      DummyOnlineOfflineStateModelFactory stateModelFactory2 = new DummyOnlineOfflineStateModelFactory(10);
+      stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory1);
+      stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
+//      _manager.getMessagingService()
+//              .registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
+//                                             genericStateMachineHandler);
+
+      _manager.connect();
+      Thread.currentThread().join();
+    }
+    catch (InterruptedException e)
+    {
+      String msg =
+          "participant:" + _instanceName + ", " + Thread.currentThread().getName()
+              + " interrupted";
+      LOG.info(msg);
+      // System.err.println(msg);
+    }
+    catch (Exception e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/ExternalCommand.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ExternalCommand.java b/helix-core/src/test/java/org/apache/helix/ExternalCommand.java
new file mode 100644
index 0000000..5183cde
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/ExternalCommand.java
@@ -0,0 +1,389 @@
+package org.apache.helix;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.log4j.Logger;
+
+/**
+* This class encapsulates a java <code>Process</code> to handle properly
+* output and error and preventing potential deadlocks. The idea is that the
+* command is executed and you can get the error or output. Simple to use.
+* Should not be used for big outputs because they are buffered internally.
+*
+* @author ypujante@linkedin.com
+*/
+public class ExternalCommand
+{
+  public static final String MODULE = ExternalCommand.class.getName();
+  public static final Logger LOG = Logger.getLogger(MODULE);
+
+  private final ProcessBuilder _processBuilder;
+
+  private Process _process;
+  private InputReader _out;
+  private InputReader _err;
+
+  private static class InputReader extends Thread
+  {
+    private static final int BUFFER_SIZE = 2048;
+
+    private final InputStream _in;
+    private final ByteArrayOutputStream _out;
+    private boolean _running = false;
+
+    InputReader(InputStream in)
+    {
+      _in = in;
+      _out = new ByteArrayOutputStream();
+    }
+
+    @Override
+    public void run()
+    {
+      _running = true;
+
+      byte[] buf = new byte[BUFFER_SIZE];
+      int n = 0;
+      try
+      {
+        while((n = _in.read(buf)) != -1)
+          _out.write(buf, 0, n);
+      }
+      catch(IOException e)
+      {
+        LOG.error("error while reading external command", e);
+      }
+
+      _running = false;
+    }
+
+    public byte[] getOutput()
+    {
+      if(_running)
+        throw new IllegalStateException("wait for process to be completed");
+
+      return _out.toByteArray();
+    }
+  }
+  /**
+* Constructor */
+  public ExternalCommand(ProcessBuilder processBuilder)
+  {
+    _processBuilder = processBuilder;
+  }
+
+  /**
+* After creating the command, you have to start it...
+*
+* @throws IOException
+*/
+  public void start() throws IOException
+  {
+    _process = _processBuilder.start();
+    _out = new InputReader(new BufferedInputStream(_process.getInputStream()));
+    _err = new InputReader(new BufferedInputStream(_process.getErrorStream()));
+
+    _out.start();
+    _err.start();
+  }
+
+  /**
+* @see ProcessBuilder
+*/
+  public Map<String, String> getEnvironment()
+  {
+    return _processBuilder.environment();
+  }
+
+  /**
+* @see ProcessBuilder
+*/
+  public File getWorkingDirectory()
+  {
+    return _processBuilder.directory();
+  }
+
+  /**
+* @see ProcessBuilder
+*/
+  public void setWorkingDirectory(File directory)
+  {
+    _processBuilder.directory(directory);
+  }
+
+  /**
+* @see ProcessBuilder
+*/
+  public boolean getRedirectErrorStream()
+  {
+    return _processBuilder.redirectErrorStream();
+  }
+
+  /**
+* @see ProcessBuilder
+*/
+  public void setRedirectErrorStream(boolean redirectErrorStream)
+  {
+    _processBuilder.redirectErrorStream(redirectErrorStream);
+  }
+
+  public byte[] getOutput() throws InterruptedException
+  {
+    waitFor();
+    return _out.getOutput();
+  }
+
+  public byte[] getError() throws InterruptedException
+  {
+    waitFor();
+    return _err.getOutput();
+  }
+
+  /**
+* Returns the output as a string.
+*
+* @param encoding
+* @return encoded string
+* @throws InterruptedException
+* @throws UnsupportedEncodingException
+*/
+  public String getStringOutput(String encoding) throws InterruptedException,
+                                                        UnsupportedEncodingException
+  {
+    return new String(getOutput(), encoding);
+  }
+
+  /**
+* Returns the output as a string. Uses encoding "UTF-8".
+*
+* @return utf8 encoded string
+* @throws InterruptedException
+*/
+  public String getStringOutput() throws InterruptedException
+  {
+    try
+    {
+      return getStringOutput("UTF-8");
+    }
+    catch(UnsupportedEncodingException e)
+    {
+      // should not happen
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+* Returns the error as a string.
+*
+* @param encoding
+* @return error as string
+* @throws InterruptedException
+* @throws UnsupportedEncodingException
+*/
+  public String getStringError(String encoding) throws InterruptedException,
+                                                       UnsupportedEncodingException
+  {
+    return new String(getError(), encoding);
+  }
+
+  /**
+* Returns the error as a string. Uses encoding "UTF-8".
+*
+* @return error as string
+* @throws InterruptedException
+*/
+  public String getStringError() throws InterruptedException
+  {
+    try
+    {
+      return getStringError("UTF-8");
+    }
+    catch(UnsupportedEncodingException e)
+    {
+      // should not happen
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+* Properly waits until everything is complete: joins on the thread that
+* reads the output, joins on the thread that reads the error and finally
+* wait for the process to be finished.
+* @return the status code of the process.
+*
+* @throws InterruptedException
+*/
+  public int waitFor() throws InterruptedException
+  {
+    if(_process == null)
+      throw new IllegalStateException("you must call start first");
+
+    _out.join();
+    _err.join();
+    return _process.waitFor();
+  }
+
+  /**
+* Properly waits until everything is complete: joins on the thread that
+* reads the output, joins on the thread that reads the error and finally
+* wait for the process to be finished.
+* If the process has not completed before the timeout, throws a
+* {@link TimeoutException}
+* @return the status code of the process.
+*
+* @throws TimeoutException
+* @throws InterruptedException
+*/
+  public int waitFor(long timeout) throws InterruptedException, TimeoutException
+  {
+    if(_process == null)
+      throw new IllegalStateException("you must call start first");
+
+//    Chronos c = new Chronos();
+    _out.join(timeout);
+//    timeout -= c.tick();
+    if (timeout <= 0)
+      throw new TimeoutException("Wait timed out");
+    _err.join(timeout);
+//    timeout -= c.tick();
+    if (timeout <= 0)
+      throw new TimeoutException("Wait timed out");
+
+    // there is no timeout in this API, not much we can do here
+    // waiting on the other two threads should give us some safety
+    return _process.waitFor();
+  }
+
+  public int exitValue()
+  {
+    if(_process == null)
+      throw new IllegalStateException("you must call start first");
+
+    return _process.exitValue();
+  }
+
+  public void destroy()
+  {
+    if(_process == null)
+      throw new IllegalStateException("you must call start first");
+
+    _process.destroy();
+  }
+
+  /**
+* Creates an external process from the command. It is not started and you have to call
+* start on it!
+*
+* @param commands the command to execute
+* @return the process */
+  public static ExternalCommand create(String... commands)
+  {
+    ExternalCommand ec = new ExternalCommand(new ProcessBuilder(commands));
+    return ec;
+  }
+
+  /**
+* Creates an external process from the command. It is not started and you have to call
+* start on it!
+*
+* @param commands the command to execute
+* @return the process */
+  public static ExternalCommand create(List<String> commands)
+  {
+    ExternalCommand ec = new ExternalCommand(new ProcessBuilder(commands));
+    return ec;
+  }
+
+  /**
+* Creates an external process from the command. The command is executed.
+*
+* @param commands the commands to execute
+* @return the process
+* @throws IOException if there is an error */
+  public static ExternalCommand start(String... commands) throws IOException
+  {
+    ExternalCommand ec = new ExternalCommand(new ProcessBuilder(commands));
+    ec.start();
+    return ec;
+  }
+
+  /**
+* Executes the external command in the given working directory and waits for it to be
+* finished.
+*
+* @param workingDirectory the root directory from where to run the command
+* @param command the command to execute (should be relative to the working directory
+* @param args the arguments to the command
+* @return the process */
+  public static ExternalCommand execute(File workingDirectory,
+                                        String command,
+                                        String... args)
+      throws IOException, InterruptedException
+  {
+    try
+    {
+      return executeWithTimeout(workingDirectory, command, 0, args);
+    }
+    catch (TimeoutException e)
+    {
+      // Can't happen!
+      throw new IllegalStateException(MODULE + ".execute: Unexpected timeout occurred!");
+    }
+  }
+
+/**
+* Executes the external command in the given working directory and waits (until timeout
+* is elapsed) for it to be finished.
+*
+* @param workingDirectory
+* the root directory from where to run the command
+* @param command
+* the command to execute (should be relative to the working directory
+* @param timeout
+* the maximum amount of time to wait for this external command (in ms). If
+* this value is less than or equal to 0, timeout is ignored
+* @param args
+* the arguments to the command
+* @return the process
+*/
+  public static ExternalCommand executeWithTimeout(File workingDirectory,
+                                                   String command,
+                                                   long timeout,
+                                                   String... args)
+      throws IOException, InterruptedException, TimeoutException
+  {
+    List<String> arguments = new ArrayList<String>(args.length + 1);
+
+    arguments.add(new File(workingDirectory, command).getAbsolutePath());
+    arguments.addAll(Arrays.asList(args));
+
+    ExternalCommand cmd = ExternalCommand.create(arguments);
+
+    cmd.setWorkingDirectory(workingDirectory);
+
+    cmd.setRedirectErrorStream(true);
+
+    cmd.start();
+
+    /* Use timeout if it is a valid value! */
+    if (timeout <= 0)
+      cmd.waitFor();
+    else
+      cmd.waitFor(timeout);
+
+    if (LOG.isDebugEnabled())
+      LOG.debug(cmd.getStringOutput());
+
+    return cmd;
+  }
+}