You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ch...@apache.org on 2012/02/25 09:07:52 UTC
svn commit: r1293544 - in /incubator/hama/trunk: contrib/
contrib/monitor-plugin/ contrib/monitor-plugin/jvm-metrics/
contrib/monitor-plugin/jvm-metrics/src/
contrib/monitor-plugin/jvm-metrics/src/main/
contrib/monitor-plugin/jvm-metrics/src/main/java/...
Author: chl501
Date: Sat Feb 25 08:07:51 2012
New Revision: 1293544
URL: http://svn.apache.org/viewvc?rev=1293544&view=rev
Log:
[HAMA-495] provides features similar to ganglia's gmond so that users can monitor
service.
Added:
incubator/hama/trunk/contrib/
incubator/hama/trunk/contrib/monitor-plugin/
incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/
incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/pom.xml
incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/
incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/
incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/
incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/
incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/
incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/
incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/
incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/plugin/
incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/plugin/JvmTask.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Metric.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/ZKUtil.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
Added: incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/pom.xml?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/pom.xml (added)
+++ incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/pom.xml Sat Feb 25 08:07:51 2012
@@ -0,0 +1,63 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.hama.monitor.plugin</groupId>
+ <artifactId>jvm-plugin</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>jvm-plugin</name>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hama</groupId>
+ <artifactId>hama-core</artifactId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>JvmTask</mainClass>
+ <packageName>org.apache.hama.monitor.plugin</packageName>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
Added: incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/plugin/JvmTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/plugin/JvmTask.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/plugin/JvmTask.java (added)
+++ incubator/hama/trunk/contrib/monitor-plugin/jvm-metrics/src/main/java/org/apache/hama/monitor/plugin/JvmTask.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor.plugin;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import static java.lang.Thread.State.*;
+import static org.apache.hama.monitor.plugin.JvmTask.Metrics.*;
+import static org.apache.hama.monitor.Monitor.Destination.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.monitor.Metric;
+import org.apache.hama.monitor.MetricsRecord; // TODO: should be moved to org.apache.hama.monitor.metrics package
+import org.apache.hama.monitor.Monitor.Destination;
+import org.apache.hama.monitor.Monitor.Result;
+import org.apache.hama.monitor.Monitor.Task;
+import org.apache.hama.monitor.Monitor.TaskException;
+
+public final class JvmTask extends Task {
+
+ public static final Log LOG = LogFactory.getLog(JvmTask.class);
+
+ private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+ private final List<GarbageCollectorMXBean> gcBeans =
+ ManagementFactory.getGarbageCollectorMXBeans();
+ private final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+ private static final long M = 1024*1024;
+
+ private final Result result;
+
+ public static enum Metrics{
+ MemNonHeapUsedM("Non-heap memory used in MB"),
+ MemNonHeapCommittedM("Non-heap memory committed in MB"),
+ MemHeapUsedM("Heap memory used in MB"),
+ MemHeapCommittedM("Heap memory committed in MB"),
+ GcCount("Total GC count"),
+ GcTimeMillis("Total GC time in milliseconds"),
+ ThreadsNew("Number of new threads"),
+ ThreadsRunnable("Number of runnable threads"),
+ ThreadsBlocked("Number of blocked threads"),
+ ThreadsWaiting("Number of waiting threads"),
+ ThreadsTimedWaiting("Number of timed waiting threads"),
+ ThreadsTerminated("Number of terminated threads");
+
+ private final String desc;
+
+ Metrics(String desc) { this.desc = desc; }
+ public String description() { return desc; }
+
+ @Override public String toString(){
+ return "name "+name()+" description "+desc;
+ }
+ }
+
+ public static final class JvmResult implements Result {
+ final AtomicReference<MetricsRecord> ref =
+ new AtomicReference<MetricsRecord>();
+
+ @Override
+ public String name() {
+ //return JvmResult.class.getSimpleName();
+ return "jvm";
+ }
+
+ public void set(MetricsRecord record) {
+ ref.set(record);
+ }
+
+ @Override
+ public MetricsRecord get() {
+ return this.ref.get();
+ }
+
+ @Override
+ public Destination[] destinations() {
+ return new Destination[]{ ZK, HDFS, JMX };
+ }
+
+ }
+
+ public JvmTask() {
+ super(JvmTask.class.getSimpleName());
+ this.result = new JvmResult();
+ }
+
+ @Override
+ public Object run() throws TaskException {
+ final MetricsRecord record = new MetricsRecord("jvm", "Jvm metrics stats.");
+ //record.tag("ProcessName", processName);
+ //record.tag("SessionId", sessionId);
+ memory(record);
+ gc(record);
+ threads(record);
+ ((JvmResult)this.result).set(record); // add to results
+ getListener().notify(this.result); // notify monitor
+ return null;
+ }
+
+ private void memory(final MetricsRecord record){
+ final MemoryUsage memNonHeap = memoryMXBean.getNonHeapMemoryUsage();
+ final MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
+ record.add(new Metric(MemNonHeapUsedM, memNonHeap.getUsed() / M));
+ record.add(new Metric(MemNonHeapCommittedM,
+ memNonHeap.getCommitted() / M));
+ record.add(new Metric(MemHeapUsedM, memHeap.getUsed() / M));
+ record.add(new Metric(MemHeapCommittedM, memHeap.getCommitted() / M));
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(MemNonHeapUsedM.description()+": "+memNonHeap.getUsed() / M);
+ LOG.debug(MemNonHeapCommittedM.description()+": "+memNonHeap.getCommitted() / M);
+ LOG.debug(MemHeapUsedM.description()+": "+memHeap.getUsed() / M);
+ LOG.debug(MemHeapCommittedM.description()+": "+memHeap.getCommitted() / M);
+ }
+ }
+
+ private void gc(final MetricsRecord record){
+ long count = 0;
+ long timeMillis = 0;
+ for (GarbageCollectorMXBean gcBean : gcBeans) {
+ long c = gcBean.getCollectionCount();
+ long t = gcBean.getCollectionTime();
+ String name = gcBean.getName();
+ record.add(new Metric("GcCount"+name, c));
+ record.add(new Metric("GcTimeMillis"+name, t));
+ count += c;
+ timeMillis += t;
+ }
+ record.add(new Metric(GcCount, count));
+ record.add(new Metric(GcTimeMillis, timeMillis));
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(GcCount.description()+": "+count);
+ LOG.debug(GcTimeMillis.description()+": "+timeMillis);
+ }
+ }
+
+ private void threads(final MetricsRecord record){
+ int threadsNew = 0;
+ int threadsRunnable = 0;
+ int threadsBlocked = 0;
+ int threadsWaiting = 0;
+ int threadsTimedWaiting = 0;
+ int threadsTerminated = 0;
+ long threadIds[] = threadMXBean.getAllThreadIds();
+ for (ThreadInfo threadInfo : threadMXBean.getThreadInfo(threadIds, 0)) {
+ if (threadInfo == null) continue; // race protection
+ Thread.State state = threadInfo.getThreadState();
+ if(NEW.equals(state)){
+ threadsNew++;
+ break;
+ }else if(RUNNABLE.equals(state)){
+ threadsRunnable++;
+ break;
+ }else if(BLOCKED.equals(state)){
+ threadsBlocked++;
+ break;
+ }else if(WAITING.equals(state)){
+ threadsWaiting++;
+ break;
+ }else if(TIMED_WAITING.equals(state)){
+ threadsTimedWaiting++;
+ break;
+ }else if(TERMINATED.equals(state)){
+ threadsTerminated++;
+ break;
+ }
+ }
+
+ record.add(new Metric(ThreadsNew, threadsNew));
+ record.add(new Metric(ThreadsRunnable, threadsRunnable));
+ record.add(new Metric(ThreadsBlocked, threadsBlocked));
+ record.add(new Metric(ThreadsWaiting, threadsWaiting));
+ record.add(new Metric(ThreadsTimedWaiting, threadsTimedWaiting));
+ record.add(new Metric(ThreadsTerminated, threadsTerminated));
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(ThreadsNew.description()+": "+threadsNew);
+ LOG.debug(ThreadsRunnable.description()+": "+threadsRunnable);
+ LOG.debug(ThreadsBlocked.description()+": "+threadsBlocked);
+ LOG.debug(ThreadsWaiting.description()+": "+threadsWaiting);
+ LOG.debug(ThreadsTimedWaiting.description()+": "+threadsTimedWaiting);
+ LOG.debug(ThreadsTerminated.description()+": "+threadsTerminated);
+ }
+ }
+}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1293544&r1=1293543&r2=1293544&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Sat Feb 25 08:07:51 2012
@@ -57,6 +57,7 @@ import org.apache.hama.bsp.sync.SyncExce
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.GroomProtocol;
import org.apache.hama.ipc.MasterProtocol;
+import org.apache.hama.monitor.Monitor;
import org.apache.hama.util.BSPNetUtils;
import org.apache.hama.zookeeper.QuorumPeer;
import org.apache.log4j.LogManager;
@@ -320,6 +321,13 @@ public class GroomServer implements Runn
this.instructor.bind(DispatchTasksDirective.class,
new DispatchTasksHandler());
instructor.start();
+
+ if(conf.getBoolean("bsp.monitor.enabled", true)) {
+ // TODO: conf.get("bsp.monitor.class.impl", "Monitor.class")
+ // so user can switch to customized monitor impl if necessary.
+ new Monitor(conf, zk, this.groomServerName).start();
+ }
+
this.running = true;
this.initialized = true;
}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.monitor.Monitor.Task;
+import org.apache.hama.monitor.Monitor.Result;
+import org.apache.hama.HamaConfiguration;
+
+/**
+ * Configurator loads and configure jar files.
+ */
+public final class Configurator {
+
+ public static final Log LOG = LogFactory.getLog(Configurator.class);
+ public static final String DEFAULT_PLUGINS_DIR = "plugins";
+ private static final ConcurrentMap<String, Long> repos =
+ new ConcurrentHashMap<String, Long>();
+
+ /**
+ * @param conf file points out the plugin dir location.
+ * @return Map contains jar path and task to be executed.
+ */
+ public static Map<String, Task> configure(HamaConfiguration conf,
+ MonitorListener listener) throws IOException {
+ String hamaHome = System.getProperty("hama.home.dir");
+ String pluginPath = conf.get("bsp.monitor.plugins.dir",
+ hamaHome+File.separator+DEFAULT_PLUGINS_DIR);
+ File pluginDir = new File(pluginPath);
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Map<String, Task> taskList = new HashMap<String, Task>();
+ LOG.info("Scanning jar files within "+pluginDir+".");
+ for(File jar: pluginDir.listFiles()) {
+ String jarPath = jar.getPath();
+ Long timestamp = repos.get(jarPath);
+ if(null == timestamp || jar.lastModified() > timestamp) {
+ Task t = load(jar, loader);
+ if(null != t) {
+ t.setListener(listener);
+ taskList.put(jarPath, t);
+ repos.put(jarPath, new Long(jar.lastModified()));
+ LOG.info(jar.getName()+" is loaded.");
+ }
+ }
+ }
+ return taskList;
+ }
+
+ /**
+ * Load jar from specified path.
+ * @param path to the jar file.
+ * @param loader of the current thread.
+ * @return task to be run.
+ */
+ private static Task load(File path, ClassLoader loader) throws IOException {
+ JarFile jar = new JarFile(path);
+ Manifest manifest = jar.getManifest();
+ String pkg = manifest.getMainAttributes().getValue("Package");
+ String main = manifest.getMainAttributes().getValue("Main-Class");
+ if(null == pkg || null == main )
+ throw new NullPointerException("Package or main class not found "+
+ "in menifest file.");
+ String namespace = pkg + File.separator + main;
+ namespace = namespace.replaceAll(File.separator, ".");
+ LOG.info("Task class to be loaded: "+namespace);
+ URLClassLoader child =
+ new URLClassLoader(new URL[]{path.toURI().toURL()}, loader);
+ Thread.currentThread().setContextClassLoader(child);
+ Class<?> taskClass = null;
+ try {
+ taskClass = Class.forName(namespace, true, child); // task class
+ } catch (ClassNotFoundException cnfe) {
+ LOG.warn("Task class is not found.", cnfe);
+ }
+ if(null == taskClass) return null;
+
+ try {
+ return (Task)taskClass.newInstance();
+ } catch(InstantiationException ie) {
+ LOG.warn("Unable to instantiate task class."+namespace, ie);
+ } catch(IllegalAccessException iae) {
+ LOG.warn(iae);
+ }
+ return null;
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Metric.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Metric.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Metric.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Metric.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+public final class Metric<T> {
+
+ private final String name;
+ private final String description;
+ private final T value;
+
+ public Metric(Enum name, T value){
+ this(name.name(), name.name()+" metric.", value);
+ }
+
+ public Metric(String name, T value){
+ this(name, name+" metric.", value);
+ }
+
+ public Metric(String name, String description, T value){
+ this.name = name;
+ this.description = description;
+ this.value = value;
+ }
+
+ public final String name(){
+ return this.name;
+ }
+
+ public final String description(){
+ return this.description;
+ }
+
+ public final T value(){
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object target){
+ if (target == this)
+ return true;
+ if (null == target)
+ return false;
+ if (getClass() != target.getClass())
+ return false;
+
+ Metric m = (Metric) target;
+ if(!name().equals(m.name))
+ return false;
+ if(!description().equals(m.description))
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode(){
+ int result = 17;
+ result = 37 * result + name.hashCode();
+ result = 37 * result + description.hashCode();
+ return result;
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.List;
+
+/**
+ * Represents a record containing multiple metrics.
+ */
+public final class MetricsRecord {
+
+ private final String name, description;
+ private final List<Metric<? extends Number>> metrics =
+ new CopyOnWriteArrayList<Metric<? extends Number>>();
+ private final List<MetricsTag> tags = new CopyOnWriteArrayList<MetricsTag>();
+
+ public MetricsRecord(String name, String description){
+ this.name = name;
+ this.description = description;
+ }
+
+ public MetricsRecord(String name){
+ this(name, name+" record.");
+ }
+
+ public final String name(){
+ return name;
+ }
+
+ public final String description(){
+ return description;
+ }
+
+ public final void add(Metric<? extends Number> metric){
+ metrics.add(metric);
+ }
+
+ public final void add(List<Metric<? extends Number>> metrics){
+ metrics.addAll(metrics);
+ }
+
+ public final void tag(Enum key, String value){
+ tags.add(new MetricsTag(key.toString(), value));
+ }
+
+ public final void tag(String key, String value){
+ tags.add(new MetricsTag(key, value));
+ }
+
+ public final Collection<MetricsTag> tags(){
+ return Collections.unmodifiableList(tags);
+ }
+
+ public final List<Metric<? extends Number>> metrics(){
+ return Collections.unmodifiableList(metrics);
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+/**
+ * Information that classifies MetricsRecord.
+ */
+public final class MetricsTag {
+ private final String name;
+ private final String value;
+
+ public MetricsTag(String name, String value){
+ this.name = name;
+ this.value = value;
+ }
+
+ public final String name(){
+ return this.name;
+ }
+
+ public final String value(){
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object target){
+ if (target == this) return true;
+ if (null == target) return false;
+ if (getClass() != target.getClass()) return false;
+
+ MetricsTag t = (MetricsTag) target;
+ if(!t.name.equals(name)) return false;
+ if(!t.value.equals(value)) return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode(){
+ int result = 17;
+ result = 37 * result + name().hashCode();
+ result = 37 * result + value().hashCode();
+ return result;
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hama.monitor.Monitor.Destination.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.util.Bytes;
+import org.apache.hama.util.ZKUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+/**
+ * Monitor daemon performs tasks in order to monitor GroomServer's status.
+ */
+public final class Monitor extends Thread implements MonitorListener {
+
+ public static final Log LOG = LogFactory.getLog(Monitor.class);
+
+ private final Map<String, TaskWorker> workers = // <jar path, task worker>
+ new ConcurrentHashMap<String, TaskWorker>();
+ private final BlockingQueue<Result> results =
+ new LinkedBlockingQueue<Result>();
+ private final Publisher publisher;
+ private final Collector collector;
+ private final Initializer initializer;
+ private final Configuration configuration;
+ // TODO: may need zookeeper different from GroomServer
+ // to alleviate groom server's barrier sync.
+ private final ZooKeeper zookeeper;
+ private final String groomServerName;
+
+ /**
+ * Destination tells Publisher where to publish the result.
+ */
+ public static enum Destination {
+ ZK("zk"),
+ HDFS("hdfs"),
+ JMX("jmx");
+
+ final String dest;
+ Destination(String dest) {
+ this.dest = dest;
+ }
+
+ public String value() { return this.dest; }
+ }
+
+ /**
+ * An interface holds the result executed by a task.
+ */
+ public static interface Result {
+
+ /**
+ * Name of the result. This attribute may be used in namespace as path.
+ * For instance, when publishing to ZooKeeper, it writes to
+ * "/monitor/<groom-server-name>/metrics/jvm" where jvm is the name
+ * provided by Result.name() function to identify the metrics record.
+ * So ideally name should be given without space e.g. `jvm', instead of
+ * `Java virtual machine'.
+ * @return name.
+ */
+ String name();
+
+ /**
+ * Destinations where the result will be handled.
+ * @return Destination in array.
+ */
+ Destination[] destinations();
+
+ /**
+ * Result after a task is executed.
+ * @return result returned.
+ */
+ Object get();
+ }
+
+ /**
+ * When executing task goes worng, TaskException is thrown.
+ */
+ public static class TaskException extends Exception {
+ public TaskException() { super(); }
+ public TaskException(String message) { super(message); }
+ public TaskException(Throwable t) { super(t); }
+ }
+
+ /**
+ * Monitor launchs a worker to run the task.
+ */
+ static class TaskWorker implements Callable {
+
+ final Task task;
+
+ TaskWorker(final Task task) {
+ this.task = task;
+ }
+
+ public Object call() throws Exception {
+ return task.run();
+ }
+
+ }
+
+ /**
+ * Monitor class setup task with results and then task worker
+ * runs the task.
+ */
+ public static abstract class Task {
+
+ final String name;
+ final AtomicReference<MonitorListener> listener =
+ new AtomicReference<MonitorListener>();
+
+ public Task(String name) {
+ this.name = name;
+ }
+
+ /**
+ * This is only used by Configurator so a task can know which monitor
+ * to notify.
+ */
+ final void setListener(MonitorListener listener) {
+ this.listener.set(listener);
+ }
+
+ /**
+ * Listener that will notify monitor.
+ */
+ public final MonitorListener getListener() {
+ return this.listener.get();
+ }
+
+ /**
+ * The name of this task.
+ */
+ public final String getName() {
+ return this.name;
+ }
+
+ /**
+ * Actual flow that tells how a task would be executed. Within run()
+ * when an event occurrs, the task should call getListener().notify(Result)
+ * passing the result so monitor can react accordingly.
+ */
+ public abstract Object run() throws TaskException;
+
+ }
+
+ public static final class ZKHandler implements PublisherHandler {
+ final ZooKeeper zk;
+ final String groomServerName;
+
+ public ZKHandler(ZooKeeper zk, String groomServerName) {
+ this.zk = zk;
+ this.groomServerName = groomServerName;
+ }
+
+ public void handle(Result result) {
+ Object obj = result.get();
+ if(obj instanceof MetricsRecord) {
+ String znode = "/monitor/"+this.groomServerName+"/metrics/"+result.name();
+ ZKUtil.create(zk, znode); // recursively create znode path
+ MetricsRecord record = (MetricsRecord) obj;
+ for(Metric<? extends Number> metric: record.metrics()) {
+ String name = metric.name();
+ Number value = metric.value();
+ try {
+ // znode must exists so that child (znode/name) can be created.
+ if(null != this.zk.exists(znode, false)) {
+ if(LOG.isDebugEnabled())
+ LOG.debug("Name & value are going to be publish to zk -> ["+name+"] ["+value+"]");
+ if(null == zk.exists(znode+File.separator+name, false)) {
+ String p = this.zk.create(znode+File.separator+name, toBytes(value),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ LOG.info("Successfully publish data to zk with path to `"+p+"'");
+ } else {
+ // can we just update by increasing 1 version?
+ this.zk.setData(znode+File.separator+name, toBytes(value), -1);
+ LOG.info("Successfully update data in znode: "+znode);
+ }
+ }
+ } catch (KeeperException ke) {
+ LOG.warn(ke);
+ } catch(InterruptedException ie) {
+ LOG.warn(ie);
+ }
+ }
+ } else {
+ LOG.warn(ZKHandler.class.getSimpleName()+
+ " don't know how to handle the result."+obj);
+ }
+ }
+
+ byte[] toBytes(Number value) {
+ if(value instanceof Double) {
+ return Bytes.toBytes(value.longValue());
+ } else if(value instanceof Float) {
+ return Bytes.toBytes(value.floatValue());
+ } else if(value instanceof Integer) {
+ return Bytes.toBytes(value.intValue());
+ } else if(value instanceof Long) {
+ return Bytes.toBytes(value.longValue());
+ } else if(value instanceof Short) {
+ return Bytes.toBytes(value.shortValue());
+ } else {
+ LOG.warn("Unknown type for value:"+value);
+ return null;
+ }
+ }
+
+ }
+
+ static interface PublisherHandler {
+ void handle(Result result);
+ }
+
+ static final class Publisher extends Thread {
+
+ final ExecutorService pool;
+ final Configuration conf;
+ final BlockingQueue<Result> results;
+ final ConcurrentMap<Destination, PublisherHandler> handlers =
+ new ConcurrentHashMap<Destination, PublisherHandler>();
+
+ Publisher(Configuration conf, BlockingQueue<Result> results) {
+ pool = Executors.newCachedThreadPool();
+ this.conf = conf;
+ this.results = results;
+ setName(this.getClass().getSimpleName());
+ setDaemon(true);
+ }
+
+ void bind(Destination dest, PublisherHandler handler) {
+ handlers.putIfAbsent(dest, handler);
+ }
+
+ PublisherHandler get(Destination dest) {
+ return handlers.get(dest);
+ }
+
+ public void run() {
+ try {
+ while(!Thread.currentThread().interrupted()) {
+ final Result result = results.take();
+ pool.submit(new Callable() {
+ public Object call() throws Exception {
+ for(Destination dest: result.destinations()) {
+ String name = result.name();
+ Publisher.this.get(dest).handle(result);
+ }// for
+ return null;
+ }
+ });
+ int period = conf.getInt("bsp.monitor.publisher.period", 5);
+ Thread.sleep(period*1000);
+ }
+ } catch(InterruptedException ie) {
+ pool.shutdown();
+ LOG.warn("Publisher is interrupted.", ie);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ static final class Collector extends Thread {
+
+ final ExecutorService pool;
+ final Configuration conf;
+ final Map<String, TaskWorker> workers;
+
+ Collector(Configuration conf, Map<String, TaskWorker> workers) {
+ pool = Executors.newCachedThreadPool();
+ this.conf = conf;
+ this.workers = workers;
+ setName(this.getClass().getSimpleName());
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ while(!Thread.currentThread().interrupted()) {
+ LOG.info("How many workers will be executed by collector? "+workers.size());
+ for(TaskWorker worker: workers.values()) {
+ pool.submit(worker);
+ }
+ int period = conf.getInt("bsp.monitor.collector.period", 5);
+ Thread.sleep(period*1000);
+ }
+ } catch(InterruptedException ie) {
+ pool.shutdown();
+ LOG.warn(this.getClass().getSimpleName()+" is interrupted.", ie);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ static final class Initializer extends Thread {
+
+ final Configuration conf;
+ final Map<String, TaskWorker> workers;
+ final MonitorListener listener;
+
+ Initializer(Map<String, TaskWorker> workers, Configuration conf,
+ MonitorListener listener) {
+ this.workers = workers;
+ this.conf = conf;
+ this.listener = listener;
+ }
+
+ /**
+ * Load jar from plugin directory for executing task.
+ */
+ public void run() {
+ try {
+ while(!Thread.currentThread().interrupted()) {
+ Map<String, Task> tasks =
+ Configurator.configure((HamaConfiguration)this.conf, listener);
+ for(Map.Entry<String, Task> entry: tasks.entrySet()) {
+ String jarPath = entry.getKey();
+ Task t = entry.getValue();
+ TaskWorker old = (TaskWorker)
+ ((ConcurrentMap)this.workers).putIfAbsent(jarPath, new TaskWorker(t));
+ if(null != old) {
+ ((ConcurrentMap)this.workers).replace(jarPath,
+ new TaskWorker(t));
+ }
+ }
+ LOG.debug("Task worker list's size: "+workers.size());
+ int period = conf.getInt("bsp.monitor.initializer.period", 5);
+ Thread.sleep(period*1000);
+ }// while
+ } catch(InterruptedException ie) {
+ LOG.warn(this.getClass().getSimpleName()+" is interrupted.", ie);
+ Thread.currentThread().interrupt();
+ } catch (IOException ioe) {
+ LOG.warn(this.getClass().getSimpleName()+" can not load jar file "+
+ " from plugin directory.", ioe);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Constructor
+ */
+ public Monitor(Configuration configuration, ZooKeeper zookeeper,
+ String groomServerName) {
+ this.configuration = configuration;
+ if(null == this.configuration)
+ throw new NullPointerException("No configuration is provided.");
+ this.zookeeper = zookeeper;
+ if(null == this.zookeeper)
+ throw new NullPointerException("ZooKeeper is not provided.");
+ this.groomServerName = groomServerName;
+ if(null == this.groomServerName)
+ throw new NullPointerException("Groom server name is not provided.");
+ this.initializer = new Initializer(workers, configuration, this);
+ this.collector = new Collector(configuration, workers);
+ this.publisher = new Publisher(configuration, results);
+ this.publisher.bind(ZK, new ZKHandler(this.zookeeper,
+ this.groomServerName));
+ setName(this.getClass().getSimpleName());
+ setDaemon(true);
+ }
+
+ /**
+ * Monitor class load jar files and initialize tasks.
+ * Dynamic realods if a new task is available.
+ */
+ public void initialize() {
+ initializer.start();
+ collector.start();
+ publisher.start();
+ }
+
+ public void notify(Result result) {
+ try {
+ results.put(result);
+ LOG.info(result.name()+" is put to queue (size is "+results.size()+")");
+ } catch(InterruptedException ie) {
+ LOG.warn(this.getClass().getSimpleName()+" is interrupted.", ie);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void run() {
+ initialize();
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+import org.apache.hama.monitor.Monitor.Result;
+
+/**
+ * MonitorListener passes the result for notification.
+ */
+public interface MonitorListener {
+
+ /**
+ * When an event is triggered, the task passes the result to notify
+ * the monitor.
+ */
+ void notify(Result result);
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/ZKUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/ZKUtil.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/ZKUtil.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/ZKUtil.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.File;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+public class ZKUtil {
+
+ public static final Log LOG = LogFactory.getLog(ZKUtil.class);
+
+ /**
+ * Recursively create ZooKeeper's znode with corresponded path, which
+ * starts from the root (/).
+ * @param zk is the target where znode is to be created.
+ * @param path of the znode on ZooKeeper server.
+ */
+ public static void create(ZooKeeper zk, String path) {
+ if(LOG.isDebugEnabled()) LOG.debug("Path to be splitted: "+path);
+ if(!path.startsWith("/"))
+ throw new IllegalArgumentException("Path is not started from root(/): "+
+ path);
+ StringTokenizer token = new StringTokenizer(path, File.separator);
+ int count = token.countTokens();
+ if(0 >= count)
+ throw new RuntimeException("Can not correctly split the path into.");
+ String[] parts = new String[count];
+ int pos = 0;
+ while(token.hasMoreTokens()) {
+ parts[pos] = token.nextToken();
+ if(LOG.isDebugEnabled()) LOG.debug("Splitted string:"+parts[pos]);
+ pos++;
+ }
+ StringBuilder builder = new StringBuilder();
+ for(String part: parts) {
+ try {
+ builder.append(File.separator+part);
+ if(null == zk.exists(builder.toString(), false)) {
+ zk.create(builder.toString(), null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ } catch(KeeperException ke) {
+ LOG.warn(ke);
+ } catch(InterruptedException ie) {
+ LOG.warn(ie);
+ }
+ }
+ }
+
+}
Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java?rev=1293544&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java Sat Feb 25 08:07:51 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import junit.framework.TestCase;
+import static org.junit.Assert.*;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+public class TestZKUtil extends TestCase {
+
+ MockZK zk;
+ String path;
+ String[] parts;
+ int pos = 0;
+ StringBuffer sb = new StringBuffer();
+
+ class MockZK extends ZooKeeper {
+
+ public MockZK(String connectString, int timeout, Watcher watcher)
+ throws IOException {
+ super(connectString, timeout, watcher);
+ }
+
+ // create is called in for loop
+ public String create(String path, byte[] data, List<ACL> acl,
+ CreateMode createMode) throws KeeperException, InterruptedException {
+ parts[pos] = path;
+ pos++;
+ sb.append(File.separator+path);
+ StringBuilder builder = new StringBuilder();
+ for(int i=0;i<pos;i++) {
+ builder.append(File.separator+parts[i]);
+ }
+ assertEquals("Make sure path created is consistent.", sb.toString(), builder.toString());
+ return path;
+ }
+ }
+
+ public void setUp() throws Exception {
+ this.zk = new MockZK("localhost:2181", 3000, null);
+ this.path = "/monitor/groom_lab01_61000/metrics/jvm";
+ StringTokenizer token = new StringTokenizer(path, File.separator);
+ int count = token.countTokens(); // should be 4
+ assertEquals("Make sure token are 4.", count, 4);
+ this.parts = new String[count]; //
+ }
+
+ public void testCreatePath() throws Exception {
+ ZKUtil.create(this.zk, path);
+ }
+
+}