You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ch...@apache.org on 2012/02/25 09:07:52 UTC

svn commit: r1293544 - in /incubator/hama/trunk: contrib/ contrib/monitor-plugin/ contrib/monitor-plugin/jvm-metrics/ contrib/monitor-plugin/jvm-metrics/src/ contrib/monitor-plugin/jvm-metrics/src/main/ contrib/monitor-plugin/jvm-metrics/src/main/java/...

Author: chl501
Date: Sat Feb 25 08:07:51 2012
New Revision: 1293544

URL: http://svn.apache.org/viewvc?rev=1293544&view=rev
Log:
[HAMA-495] provides features similar to ganglia's gmond so that users can monitor 
service.

Added:
    incubator/hama/trunk/contrib/
    incubator/hama/trunk/contrib/monitor-plugin/
    incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/
    incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/pom.xml
    incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/
    incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/
    incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/
    incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/
    incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/
    incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/
    incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/
    incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/plugin/
    incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/plugin/JvmTask.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/
    incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Metric.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/util/ZKUtil.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java
Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java

Added: incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/pom.xml?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/pom.xml (added)
+++ incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/pom.xml Sat Feb 25 08:07:51 2012
@@ -0,0 +1,63 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.apache.hama.monitor.plugin</groupId>
+  <artifactId>jvm-plugin</artifactId>
+  <version>0.1-SNAPSHOT</version>
+  <packaging>jar</packaging>
+
+  <name>jvm-plugin</name>
+  <url>http://maven.apache.org</url>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hama</groupId>
+      <artifactId>hama-core</artifactId>
+      <version>0.5.0-incubating-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>3.8.1</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifest>
+              <mainClass>JvmTask</mainClass>
+              <packageName>org.apache.hama.monitor.plugin</packageName>
+            </manifest>
+          </archive>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build> 
+</project>

Added: incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/plugin/JvmTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/plugin/JvmTask.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/plugin/JvmTask.java (added)
+++ incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/plugin/JvmTask.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor.plugin;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import static java.lang.Thread.State.*;
+import static org.apache.hama.monitor.plugin.JvmTask.Metrics.*;
+import static org.apache.hama.monitor.Monitor.Destination.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.monitor.Metric;
+import org.apache.hama.monitor.MetricsRecord; // TODO: should be moved to org.apache.hama.monitor.metrics package
+import org.apache.hama.monitor.Monitor.Destination;
+import org.apache.hama.monitor.Monitor.Result;
+import org.apache.hama.monitor.Monitor.Task;
+import org.apache.hama.monitor.Monitor.TaskException;
+
+public final class JvmTask extends Task {
+
+  public static final Log LOG = LogFactory.getLog(JvmTask.class);
+
+  private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+  private final List<GarbageCollectorMXBean> gcBeans =
+      ManagementFactory.getGarbageCollectorMXBeans();
+  private final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+  private static final long M = 1024*1024;
+ 
+  private final Result result;
+
+  public static enum Metrics{
+    MemNonHeapUsedM("Non-heap memory used in MB"),
+    MemNonHeapCommittedM("Non-heap memory committed in MB"),
+    MemHeapUsedM("Heap memory used in MB"),
+    MemHeapCommittedM("Heap memory committed in MB"),
+    GcCount("Total GC count"),
+    GcTimeMillis("Total GC time in milliseconds"),
+    ThreadsNew("Number of new threads"),
+    ThreadsRunnable("Number of runnable threads"),
+    ThreadsBlocked("Number of blocked threads"),
+    ThreadsWaiting("Number of waiting threads"),
+    ThreadsTimedWaiting("Number of timed waiting threads"),
+    ThreadsTerminated("Number of terminated threads");
+
+    private final String desc;
+
+    Metrics(String desc) { this.desc = desc; }
+    public String description() { return desc; }
+
+    @Override public String toString(){
+      return "name "+name()+" description "+desc;
+    }
+  }
+
+  public static final class JvmResult implements Result {
+    final AtomicReference<MetricsRecord> ref = 
+      new AtomicReference<MetricsRecord>();
+
+    @Override
+    public String name() {
+      //return JvmResult.class.getSimpleName();
+      return "jvm"; 
+    }
+
+    public void set(MetricsRecord record) {
+      ref.set(record);
+    }
+
+    @Override
+    public MetricsRecord get() {
+      return this.ref.get();
+    }
+
+    @Override
+    public Destination[] destinations() {
+      return new Destination[]{ ZK, HDFS, JMX };
+    }     
+
+  }
+
+  public JvmTask() {
+    super(JvmTask.class.getSimpleName());
+    this.result = new JvmResult();
+  }
+
+  @Override
+  public Object run() throws TaskException {
+    final MetricsRecord record = new MetricsRecord("jvm", "Jvm metrics stats.");
+    //record.tag("ProcessName", processName);
+    //record.tag("SessionId", sessionId);
+    memory(record);
+    gc(record);
+    threads(record);
+    ((JvmResult)this.result).set(record); // add to results
+    getListener().notify(this.result); // notify monitor
+    return null;
+  }  
+
+  private void memory(final MetricsRecord record){
+    final MemoryUsage memNonHeap = memoryMXBean.getNonHeapMemoryUsage();
+    final MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
+    record.add(new Metric(MemNonHeapUsedM, memNonHeap.getUsed() / M));
+    record.add(new Metric(MemNonHeapCommittedM,
+                           memNonHeap.getCommitted() / M));
+    record.add(new Metric(MemHeapUsedM, memHeap.getUsed() / M));
+    record.add(new Metric(MemHeapCommittedM, memHeap.getCommitted() / M));
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug(MemNonHeapUsedM.description()+": "+memNonHeap.getUsed() / M);
+      LOG.debug(MemNonHeapCommittedM.description()+": "+memNonHeap.getCommitted() / M);
+      LOG.debug(MemHeapUsedM.description()+": "+memHeap.getUsed() / M);
+      LOG.debug(MemHeapCommittedM.description()+": "+memHeap.getCommitted() / M);
+    }
+  }
+
+  private void gc(final MetricsRecord record){
+    long count = 0;
+    long timeMillis = 0;
+    for (GarbageCollectorMXBean gcBean : gcBeans) {
+      long c = gcBean.getCollectionCount();
+      long t = gcBean.getCollectionTime();
+      String name = gcBean.getName();
+      record.add(new Metric("GcCount"+name, c));
+      record.add(new Metric("GcTimeMillis"+name, t));
+      count += c;
+      timeMillis += t;
+    }
+    record.add(new Metric(GcCount, count));
+    record.add(new Metric(GcTimeMillis, timeMillis));
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug(GcCount.description()+": "+count);
+      LOG.debug(GcTimeMillis.description()+": "+timeMillis);
+    }
+  }
+
+  private void threads(final MetricsRecord record){
+    int threadsNew = 0;
+    int threadsRunnable = 0;
+    int threadsBlocked = 0;
+    int threadsWaiting = 0;
+    int threadsTimedWaiting = 0;
+    int threadsTerminated = 0;
+    long threadIds[] = threadMXBean.getAllThreadIds();
+    for (ThreadInfo threadInfo : threadMXBean.getThreadInfo(threadIds, 0)) {
+      if (threadInfo == null) continue; // race protection
+      Thread.State state = threadInfo.getThreadState();
+      if(NEW.equals(state)){
+        threadsNew++;
+        break;
+      }else if(RUNNABLE.equals(state)){
+        threadsRunnable++;
+        break;
+      }else if(BLOCKED.equals(state)){
+        threadsBlocked++;
+        break;
+      }else if(WAITING.equals(state)){
+        threadsWaiting++;
+        break;
+      }else if(TIMED_WAITING.equals(state)){
+        threadsTimedWaiting++;
+        break;
+      }else if(TERMINATED.equals(state)){
+        threadsTerminated++;
+        break;
+      }
+    }
+
+    record.add(new Metric(ThreadsNew, threadsNew));
+    record.add(new Metric(ThreadsRunnable, threadsRunnable));
+    record.add(new Metric(ThreadsBlocked, threadsBlocked));
+    record.add(new Metric(ThreadsWaiting, threadsWaiting));
+    record.add(new Metric(ThreadsTimedWaiting, threadsTimedWaiting));
+    record.add(new Metric(ThreadsTerminated, threadsTerminated));
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug(ThreadsNew.description()+": "+threadsNew);
+      LOG.debug(ThreadsRunnable.description()+": "+threadsRunnable);
+      LOG.debug(ThreadsBlocked.description()+": "+threadsBlocked);
+      LOG.debug(ThreadsWaiting.description()+": "+threadsWaiting);
+      LOG.debug(ThreadsTimedWaiting.description()+": "+threadsTimedWaiting);
+      LOG.debug(ThreadsTerminated.description()+": "+threadsTerminated);
+    }
+  }
+}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1293544&r1=1293543&r2=1293544&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Sat Feb 25 08:07:51 2012
@@ -57,6 +57,7 @@ import org.apache.hama.bsp.sync.SyncExce
 import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.ipc.GroomProtocol;
 import org.apache.hama.ipc.MasterProtocol;
+import org.apache.hama.monitor.Monitor;
 import org.apache.hama.util.BSPNetUtils;
 import org.apache.hama.zookeeper.QuorumPeer;
 import org.apache.log4j.LogManager;
@@ -320,6 +321,13 @@ public class GroomServer implements Runn
     this.instructor.bind(DispatchTasksDirective.class,
         new DispatchTasksHandler());
     instructor.start();
+
+    if(conf.getBoolean("bsp.monitor.enabled", true)) {
+      // TODO: conf.get("bsp.monitor.class.impl", "Monitor.class")
+      // so user can switch to customized monitor impl if necessary.
+      new Monitor(conf, zk, this.groomServerName).start();
+    }
+
     this.running = true;
     this.initialized = true;
   }

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.monitor.Monitor.Task;
+import org.apache.hama.monitor.Monitor.Result;
+import org.apache.hama.HamaConfiguration;
+
+/**
+ * Configurator loads and configure jar files.  
+ */
+public final class Configurator {
+
+  public static final Log LOG = LogFactory.getLog(Configurator.class); 
+  public static final String DEFAULT_PLUGINS_DIR = "plugins";
+  private static final ConcurrentMap<String, Long> repos = 
+    new ConcurrentHashMap<String, Long>();
+
+  /**
+   * @param conf file points out the plugin dir location.
+   * @return Map contains jar path and task to be executed.
+   */ 
+  public static Map<String, Task> configure(HamaConfiguration conf, 
+      MonitorListener listener) throws IOException {
+    String hamaHome = System.getProperty("hama.home.dir");
+    String pluginPath = conf.get("bsp.monitor.plugins.dir", 
+      hamaHome+File.separator+DEFAULT_PLUGINS_DIR);
+    File pluginDir = new File(pluginPath);
+    ClassLoader loader = Thread.currentThread().getContextClassLoader();
+    Map<String, Task> taskList = new HashMap<String, Task>();
+    LOG.info("Scanning jar files within "+pluginDir+".");
+    for(File jar: pluginDir.listFiles()) {
+      String jarPath = jar.getPath();
+      Long timestamp = repos.get(jarPath);
+      if(null == timestamp || jar.lastModified() > timestamp) {
+        Task t = load(jar, loader);
+        if(null != t) {
+          t.setListener(listener);
+          taskList.put(jarPath, t);
+          repos.put(jarPath, new Long(jar.lastModified()));
+          LOG.info(jar.getName()+" is loaded.");
+        }
+      }
+    }
+    return taskList;
+  }
+
+  /**
+   * Load jar from specified path.
+   * @param path to the jar file.
+   * @param loader of the current thread.  
+   * @return task to be run.
+   */
+  private static Task load(File path, ClassLoader loader) throws IOException {
+    JarFile jar = new JarFile(path);
+    Manifest manifest = jar.getManifest();
+    String pkg = manifest.getMainAttributes().getValue("Package");
+    String main = manifest.getMainAttributes().getValue("Main-Class");
+    if(null == pkg || null == main ) 
+      throw new NullPointerException("Package or main class not found "+
+      "in menifest file.");
+    String namespace = pkg + File.separator + main;
+    namespace = namespace.replaceAll(File.separator, ".");
+    LOG.info("Task class to be loaded: "+namespace);
+    URLClassLoader child = 
+      new URLClassLoader(new URL[]{path.toURI().toURL()}, loader); 
+    Thread.currentThread().setContextClassLoader(child);
+    Class<?> taskClass = null;
+    try {
+      taskClass = Class.forName(namespace, true, child); // task class
+    } catch (ClassNotFoundException cnfe) {
+      LOG.warn("Task class is not found.", cnfe);
+    }
+    if(null == taskClass) return null;
+
+    try {
+      return (Task)taskClass.newInstance();
+    } catch(InstantiationException ie) {
+      LOG.warn("Unable to instantiate task class."+namespace, ie);
+    } catch(IllegalAccessException iae) {
+      LOG.warn(iae);
+    }
+    return null;
+  }
+  
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Metric.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Metric.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Metric.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Metric.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+public final class Metric<T> {
+
+  private final String name;
+  private final String description;
+  private final T value;
+
+  public Metric(Enum name, T value){
+    this(name.name(), name.name()+" metric.", value);
+  }
+
+  public Metric(String name, T value){
+    this(name, name+" metric.", value);
+  }
+
+  public Metric(String name, String description, T value){
+    this.name = name;
+    this.description = description;
+    this.value = value;
+  }
+
+  public final String name(){
+    return this.name;
+  } 
+
+  public final String description(){
+    return this.description;
+  }
+
+  public final T value(){
+    return this.value;
+  }
+
+  @Override
+  public boolean equals(Object target){
+   if (target == this)
+      return true;
+    if (null == target)
+      return false;
+    if (getClass() != target.getClass())
+      return false;
+
+    Metric m = (Metric) target;
+    if(!name().equals(m.name))
+      return false;
+    if(!description().equals(m.description))
+      return false;
+
+    return true;
+  }
+
+  @Override 
+  public int hashCode(){
+    int result = 17;
+    result = 37 * result + name.hashCode();
+    result = 37 * result + description.hashCode();
+    return result;
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.List;
+
+/**
+ * Represents a record containing multiple metrics. 
+ */
+public final class MetricsRecord  {
+
+  private final String name, description;
+  private final List<Metric<? extends Number>> metrics = 
+    new CopyOnWriteArrayList<Metric<? extends Number>>();
+  private final List<MetricsTag> tags = new CopyOnWriteArrayList<MetricsTag>();
+
+  public MetricsRecord(String name, String description){
+    this.name = name;
+    this.description = description;
+  }
+
+  public MetricsRecord(String name){
+    this(name, name+" record."); 
+  }
+
+  public final String name(){
+    return name;
+  }
+
+  public final String description(){
+    return description;
+  }
+
+  public final void add(Metric<? extends Number> metric){
+    metrics.add(metric);
+  }
+
+  public final void add(List<Metric<? extends Number>> metrics){
+    metrics.addAll(metrics);
+  }
+
+  public final void tag(Enum key, String value){
+    tags.add(new MetricsTag(key.toString(), value));
+  }
+
+  public final void tag(String key, String value){
+    tags.add(new MetricsTag(key, value));
+  }
+
+  public final Collection<MetricsTag> tags(){
+    return Collections.unmodifiableList(tags);
+  }
+
+  public final List<Metric<? extends Number>> metrics(){
+    return Collections.unmodifiableList(metrics);
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+/**
+ * Information that classifies MetricsRecord.
+ */
+public final class MetricsTag {
+  private final String name;
+  private final String value;
+
+  public MetricsTag(String name, String value){
+    this.name = name;
+    this.value = value;
+  }
+
+  public final String name(){
+    return this.name;
+  }
+
+  public final String value(){
+    return this.value;
+  }
+
+  @Override
+  public boolean equals(Object target){
+   if (target == this) return true;
+    if (null == target) return false;
+    if (getClass() != target.getClass()) return false;
+ 
+    MetricsTag t = (MetricsTag) target;
+    if(!t.name.equals(name)) return false;
+    if(!t.value.equals(value)) return false;
+    return true;
+  }
+
+  @Override
+  public int hashCode(){
+    int result = 17;
+    result = 37 * result + name().hashCode();
+    result = 37 * result + value().hashCode();
+    return result; 
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hama.monitor.Monitor.Destination.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.util.Bytes;
+import org.apache.hama.util.ZKUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+/**
+ * Monitor daemon performs tasks in order to monitor GroomServer's status.   
+ */
+public final class Monitor extends Thread implements MonitorListener { 
+
+  public static final Log LOG = LogFactory.getLog(Monitor.class);
+
+  private final Map<String, TaskWorker> workers =  // <jar path, task worker>
+    new ConcurrentHashMap<String, TaskWorker>();
+  private final BlockingQueue<Result> results = 
+    new LinkedBlockingQueue<Result>();
+  private final Publisher publisher;
+  private final Collector collector;
+  private final Initializer initializer;
+  private final Configuration configuration;
+  // TODO: may need zookeeper different from GroomServer 
+  // to alleviate groom server's barrier sync.
+  private final ZooKeeper zookeeper; 
+  private final String groomServerName;
+
+  /**
+   * Destination tells Publisher where to publish the result.
+   */
+  public static enum Destination {
+    ZK("zk"),
+    HDFS("hdfs"),
+    JMX("jmx");
+  
+    final String dest;
+    Destination(String dest) {
+      this.dest = dest;
+    }
+
+    public String value() { return this.dest; }
+  }
+  
+  /**
+   * An interface holds the result executed by a task.
+   */
+  public static interface Result { 
+ 
+    /**
+     * Name of the result. This attribute may be used in namespace as path. 
+     * For instance, when publishing to ZooKeeper, it writes to 
+     * "/monitor/<groom-server-name>/metrics/jvm" where jvm is the name
+     * provided by Result.name() function to identify the metrics record.
+     * So ideally name should be given without space e.g. `jvm', instead of
+     * `Java virtual machine'.
+     * @return name.
+     */
+    String name();
+    
+    /**
+     * Destinations where the result will be handled.
+     * @return Destination in array.
+     */
+    Destination[] destinations();
+
+    /**
+     * Result after a task is executed. 
+     * @return result returned.
+     */
+    Object get();
+  }
+
+  /**
+   * When executing task goes worng, TaskException is thrown. 
+   */
+  public static class TaskException extends Exception {
+    public TaskException() { super(); } 
+    public TaskException(String message) { super(message); } 
+    public TaskException(Throwable t) { super(t); } 
+  }
+
+  /**
+   * Monitor launchs a worker to run the task. 
+   */
+  static class TaskWorker implements Callable {
+
+    final Task task;
+
+    TaskWorker(final Task task) {
+      this.task = task;
+    }
+
+    public Object call() throws Exception {
+      return task.run();
+    }  
+     
+  }
+
+  /**
+   * Monitor class setup task with results and then task worker
+   * runs the task.
+   */
+  public static abstract class Task { 
+
+    final String name;
+    final AtomicReference<MonitorListener> listener = 
+      new AtomicReference<MonitorListener>();
+
+    public Task(String name) {
+      this.name = name;
+    }
+
+    /**
+     * This is only used by Configurator so a task can know which monitor
+     * to notify. 
+     */
+    final void setListener(MonitorListener listener) {
+      this.listener.set(listener);
+    }
+
+    /**
+     * Listener that will notify monitor. 
+     */
+    public final MonitorListener getListener() {
+      return this.listener.get();
+    }
+
+    /**
+     * The name of this task.
+     */
+    public final String getName() {
+      return this.name;
+    }
+
+    /**
+     * Actual flow that tells how a task would be executed. Within run()
+     * when an event occurrs, the task should call getListener().notify(Result) 
+     * passing the result so monitor can react accordingly.
+     */
+    public abstract Object run() throws TaskException; 
+      
+  }
+
+  public static final class ZKHandler implements PublisherHandler {
+    final ZooKeeper zk;
+    final String groomServerName;
+
+    public ZKHandler(ZooKeeper zk, String groomServerName) {
+      this.zk = zk;
+      this.groomServerName = groomServerName;
+    }
+
+    public void handle(Result result) {
+      Object obj = result.get(); 
+      if(obj instanceof MetricsRecord) {
+        String znode = "/monitor/"+this.groomServerName+"/metrics/"+result.name();
+        ZKUtil.create(zk, znode); // recursively create znode path
+        MetricsRecord record = (MetricsRecord) obj;
+        for(Metric<? extends Number> metric: record.metrics()) {
+          String name = metric.name();
+          Number value = metric.value();
+          try {
+            // znode must exists so that child (znode/name) can be created.
+            if(null != this.zk.exists(znode, false)) { 
+              if(LOG.isDebugEnabled())
+                LOG.debug("Name & value are going to be publish to zk -> ["+name+"] ["+value+"]");
+              if(null == zk.exists(znode+File.separator+name, false)) {
+                String p = this.zk.create(znode+File.separator+name, toBytes(value), 
+                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                LOG.info("Successfully publish data to zk with path to `"+p+"'");
+              } else {
+                // can we just update by increasing 1 version?
+                this.zk.setData(znode+File.separator+name, toBytes(value), -1); 
+                LOG.info("Successfully update data in znode: "+znode);
+              }
+            }
+          } catch (KeeperException ke) {
+            LOG.warn(ke);
+          } catch(InterruptedException ie) {
+            LOG.warn(ie);
+          }
+        }
+      } else {
+        LOG.warn(ZKHandler.class.getSimpleName()+
+        " don't know how to handle the result."+obj);
+      }
+    }
+
+    byte[] toBytes(Number value) {
+      if(value instanceof Double) {
+        return Bytes.toBytes(value.longValue());
+      } else if(value instanceof Float) {
+        return Bytes.toBytes(value.floatValue());
+      } else if(value instanceof Integer) {
+        return Bytes.toBytes(value.intValue());
+      } else if(value instanceof Long) {
+        return Bytes.toBytes(value.longValue());
+      } else if(value instanceof Short) {
+        return Bytes.toBytes(value.shortValue());
+      } else {
+        LOG.warn("Unknown type for value:"+value);
+        return null;
+      }
+    }
+
+  }
+
+  static interface PublisherHandler {
+    void handle(Result result);
+  }
+
+  static final class Publisher extends Thread {
+
+    final ExecutorService pool;
+    final Configuration conf;
+    final BlockingQueue<Result> results;
+    final ConcurrentMap<Destination, PublisherHandler> handlers = 
+      new ConcurrentHashMap<Destination, PublisherHandler>();
+
+    Publisher(Configuration conf, BlockingQueue<Result> results) {
+      pool = Executors.newCachedThreadPool();
+      this.conf = conf;
+      this.results = results;
+      setName(this.getClass().getSimpleName());
+      setDaemon(true);
+    }
+
+    void bind(Destination dest, PublisherHandler handler) {
+      handlers.putIfAbsent(dest, handler); 
+    }
+
+    PublisherHandler get(Destination dest) {
+      return handlers.get(dest);
+    }
+
+    public void run() {
+      try {
+        while(!Thread.currentThread().interrupted()) {
+          final Result result = results.take();
+          pool.submit(new Callable() {
+            public Object call() throws Exception {
+              for(Destination dest: result.destinations()) {
+                String name = result.name();
+                Publisher.this.get(dest).handle(result);
+              }// for
+              return null;
+            }
+          }); 
+          int period = conf.getInt("bsp.monitor.publisher.period", 5);
+          Thread.sleep(period*1000);
+        }
+      } catch(InterruptedException ie) {
+        pool.shutdown();
+        LOG.warn("Publisher is interrupted.", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  static final class Collector extends Thread {
+
+    final ExecutorService pool;
+    final Configuration conf;
+    final Map<String, TaskWorker> workers;
+
+    Collector(Configuration conf, Map<String, TaskWorker> workers) {
+      pool = Executors.newCachedThreadPool();
+      this.conf = conf;
+      this.workers = workers;
+      setName(this.getClass().getSimpleName());
+      setDaemon(true);
+    }
+
+    public void run() {
+      try { 
+        while(!Thread.currentThread().interrupted()) {
+          LOG.info("How many workers will be executed by collector? "+workers.size());
+          for(TaskWorker worker: workers.values()) {
+            pool.submit(worker);
+          }
+          int period = conf.getInt("bsp.monitor.collector.period", 5);
+          Thread.sleep(period*1000);
+        }
+      } catch(InterruptedException ie) {
+        pool.shutdown();
+        LOG.warn(this.getClass().getSimpleName()+" is interrupted.", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+  } 
+
+  static final class Initializer extends Thread {
+
+    final Configuration conf;
+    final Map<String, TaskWorker> workers;
+    final MonitorListener listener;
+
+    Initializer(Map<String, TaskWorker> workers, Configuration conf, 
+        MonitorListener listener) {
+      this.workers = workers;
+      this.conf = conf;
+      this.listener = listener;
+    } 
+
+    /**
+     * Load jar from plugin directory for executing task.
+     */
+    public void run() {
+      try {
+        while(!Thread.currentThread().interrupted()) {
+          Map<String, Task> tasks = 
+            Configurator.configure((HamaConfiguration)this.conf, listener);
+          for(Map.Entry<String, Task> entry: tasks.entrySet()) {
+            String jarPath = entry.getKey();
+            Task t = entry.getValue();
+            TaskWorker old = (TaskWorker)
+              ((ConcurrentMap)this.workers).putIfAbsent(jarPath, new TaskWorker(t));
+            if(null != old) {
+              ((ConcurrentMap)this.workers).replace(jarPath, 
+              new TaskWorker(t));
+            }
+          }
+          LOG.debug("Task worker list's size: "+workers.size());
+          int period = conf.getInt("bsp.monitor.initializer.period", 5);
+          Thread.sleep(period*1000);
+        }// while
+      } catch(InterruptedException ie) {
+        LOG.warn(this.getClass().getSimpleName()+" is interrupted.", ie);
+        Thread.currentThread().interrupt();
+      } catch (IOException ioe) {
+        LOG.warn(this.getClass().getSimpleName()+" can not load jar file "+
+                 " from plugin directory.", ioe);
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+  
+  /**
+   * Constructor 
+   */
+  public Monitor(Configuration configuration, ZooKeeper zookeeper, 
+      String groomServerName) {
+    this.configuration = configuration;  
+    if(null == this.configuration)
+      throw new NullPointerException("No configuration is provided.");
+    this.zookeeper = zookeeper;
+    if(null == this.zookeeper)
+      throw new NullPointerException("ZooKeeper is not provided.");
+    this.groomServerName = groomServerName;
+    if(null == this.groomServerName)
+      throw new NullPointerException("Groom server name is not provided.");
+    this.initializer = new Initializer(workers, configuration, this);
+    this.collector = new Collector(configuration, workers);
+    this.publisher = new Publisher(configuration, results);
+    this.publisher.bind(ZK, new ZKHandler(this.zookeeper, 
+      this.groomServerName));
+    setName(this.getClass().getSimpleName());
+    setDaemon(true);
+  }
+
+  /**
+   * Monitor class load jar files and initialize tasks.
+   * Dynamic realods if a new task is available.
+   */
+  public void initialize() { 
+    initializer.start(); 
+    collector.start();
+    publisher.start();
+  }
+
+  public void notify(Result result) {
+    try {
+      results.put(result); 
+      LOG.info(result.name()+" is put to queue (size is "+results.size()+")");
+    } catch(InterruptedException ie) {
+      LOG.warn(this.getClass().getSimpleName()+" is interrupted.", ie);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public void run() {
+    initialize();
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+import org.apache.hama.monitor.Monitor.Result; 
+
+/**
+ * MonitorListener passes the result for notification. 
+ */
+public interface MonitorListener { 
+  
+  /**
+   * When an event is triggered, the task passes the result to notify
+   * the monitor.
+   */
+  void notify(Result result); 
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/ZKUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/ZKUtil.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/ZKUtil.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/ZKUtil.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.File;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids; 
+import org.apache.zookeeper.ZooKeeper;
+
+public class ZKUtil {
+
+  public static final Log LOG = LogFactory.getLog(ZKUtil.class); 
+
+  /**
+   * Recursively create ZooKeeper's znode with corresponded path, which 
+   * starts from the root (/).
+   * @param zk is the target where znode is to be created.
+   * @param path of the znode on ZooKeeper server.
+   */
+  public static void create(ZooKeeper zk, String path) {
+    if(LOG.isDebugEnabled()) LOG.debug("Path to be splitted: "+path);
+    if(!path.startsWith("/"))
+      throw new IllegalArgumentException("Path is not started from root(/): "+
+      path);
+    StringTokenizer token = new StringTokenizer(path, File.separator); 
+    int count = token.countTokens();
+    if(0 >= count) 
+      throw new RuntimeException("Can not correctly split the path into.");
+    String[] parts = new String[count];
+    int pos = 0;
+    while(token.hasMoreTokens()) {
+      parts[pos] = token.nextToken();
+      if(LOG.isDebugEnabled()) LOG.debug("Splitted string:"+parts[pos]);
+      pos++;
+    }
+    StringBuilder builder = new StringBuilder();
+    for(String part: parts) {
+      try {
+        builder.append(File.separator+part);
+        if(null == zk.exists(builder.toString(), false)) {
+          zk.create(builder.toString(), null, Ids.OPEN_ACL_UNSAFE, 
+          CreateMode.PERSISTENT);
+        } 
+      } catch(KeeperException ke) {
+        LOG.warn(ke);
+      } catch(InterruptedException ie) {
+        LOG.warn(ie);
+      }
+    }
+  }
+
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import junit.framework.TestCase;
+import static org.junit.Assert.*;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+public class TestZKUtil extends TestCase {
+
+  MockZK zk;
+  String path;
+  String[] parts;
+  int pos = 0;
+  StringBuffer sb = new StringBuffer();
+
+  class MockZK extends ZooKeeper {
+
+    public MockZK(String connectString, int timeout, Watcher watcher) 
+        throws IOException { 
+      super(connectString, timeout, watcher);
+    }
+   
+    // create is called in for loop 
+    public String create(String path, byte[] data, List<ACL> acl, 
+        CreateMode createMode) throws KeeperException, InterruptedException {  
+      parts[pos] = path; 
+      pos++;
+      sb.append(File.separator+path);
+      StringBuilder builder = new StringBuilder();
+      for(int i=0;i<pos;i++) {
+        builder.append(File.separator+parts[i]);
+      }
+      assertEquals("Make sure path created is consistent.", sb.toString(), builder.toString());
+      return path;
+    }
+  }
+
+  public void setUp() throws Exception {
+    this.zk = new MockZK("localhost:2181", 3000, null);
+    this.path = "/monitor/groom_lab01_61000/metrics/jvm";
+    StringTokenizer token = new StringTokenizer(path, File.separator);
+    int count = token.countTokens(); // should be 4
+    assertEquals("Make sure token are 4.", count, 4);
+    this.parts = new String[count]; // 
+  }
+
+  public void testCreatePath() throws Exception {
+    ZKUtil.create(this.zk, path); 
+  }
+
+}