You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:14 UTC

[9/50] [abbrv] Remove troublesome carriage return

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java b/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java
index 0ea56a2..4405dd7 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java
@@ -1,82 +1,82 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.test;
-
-import org.apache.s4.comm.core.TaskManager;
-import org.apache.s4.comm.file.StaticTaskManager;
-import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import org.apache.s4.comm.zk.ZkTaskSetup;
-import org.apache.s4.comm.zk.ZkTaskManager;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-public class TaskManagerTest {
-    public static void main(String[] args) throws Exception {
-        // testZkTaskManager(args);
-        testStaticTaskManager(args);
-        Thread.sleep(10000);
-    }
-
-    private static void testStaticTaskManager(String[] args) {
-        String address = null;
-        address = "localhost:2181";
-        TaskManager taskManager = new StaticTaskManager(address,
-                                                        "taskmanagerTest",
-                                                        ClusterType.S4,
-                                                        null);
-        Map<String, String> customTaskData = new HashMap<String, String>();
-        Object acquireTask = taskManager.acquireTask(customTaskData);
-        System.out.println("Acuired Task:" + acquireTask);
-
-    }
-
-    private static void testZkTaskManager(String[] args) {
-        System.out.println("Here");
-        // "effortfell.greatamerica.corp.yahoo.com:2181"
-        String address = args[0];
-        address = "localhost:2181";
-        String processName = args[1];
-        ZkTaskSetup taskSetup = new ZkTaskSetup(address,
-                                                      "/taskmanagerTest",
-                                                      ClusterType.S4);
-        taskSetup.cleanUp();
-        taskSetup.setUpTasks("1.0.0.0", new String[] { "task0", "task1" });
-        Object obj;
-        System.out.println(processName + " Going to Wait for a task");
-        HashMap<String, String> map = new HashMap<String, String>();
-        ZkTaskManager taskManager = new ZkTaskManager(address,
-                                                      "/taskmanagerTest",
-                                                      ClusterType.S4);
-        obj = taskManager.acquireTask(map);
-        System.out.println(processName + "taking up task: " + obj);
-        File f = new File("c:/" + obj + ".file");
-        f.delete();
-        while (true) {
-            if (f.exists()) {
-                break;
-            }
-            System.out.println(processName + " processing task: " + obj);
-            try {
-                Thread.sleep(10000);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-        System.out.println("Exiting task:" + obj);
-    }
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.test;
+
+import org.apache.s4.comm.core.TaskManager;
+import org.apache.s4.comm.file.StaticTaskManager;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+import org.apache.s4.comm.zk.ZkTaskSetup;
+import org.apache.s4.comm.zk.ZkTaskManager;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TaskManagerTest {
+    public static void main(String[] args) throws Exception {
+        // testZkTaskManager(args);
+        testStaticTaskManager(args);
+        Thread.sleep(10000);
+    }
+
+    private static void testStaticTaskManager(String[] args) {
+        String address = null;
+        address = "localhost:2181";
+        TaskManager taskManager = new StaticTaskManager(address,
+                                                        "taskmanagerTest",
+                                                        ClusterType.S4,
+                                                        null);
+        Map<String, String> customTaskData = new HashMap<String, String>();
+        Object acquireTask = taskManager.acquireTask(customTaskData);
+        System.out.println("Acuired Task:" + acquireTask);
+
+    }
+
+    private static void testZkTaskManager(String[] args) {
+        System.out.println("Here");
+        // "effortfell.greatamerica.corp.yahoo.com:2181"
+        String address = args[0];
+        address = "localhost:2181";
+        String processName = args[1];
+        ZkTaskSetup taskSetup = new ZkTaskSetup(address,
+                                                      "/taskmanagerTest",
+                                                      ClusterType.S4);
+        taskSetup.cleanUp();
+        taskSetup.setUpTasks("1.0.0.0", new String[] { "task0", "task1" });
+        Object obj;
+        System.out.println(processName + " Going to Wait for a task");
+        HashMap<String, String> map = new HashMap<String, String>();
+        ZkTaskManager taskManager = new ZkTaskManager(address,
+                                                      "/taskmanagerTest",
+                                                      ClusterType.S4);
+        obj = taskManager.acquireTask(map);
+        System.out.println(processName + "taking up task: " + obj);
+        File f = new File("c:/" + obj + ".file");
+        f.delete();
+        while (true) {
+            if (f.exists()) {
+                break;
+            }
+            System.out.println(processName + " processing task: " + obj);
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+        System.out.println("Exiting task:" + obj);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java b/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java
index d0c80c0..9cac542 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java
@@ -1,122 +1,122 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.test;
-
-import org.apache.s4.comm.util.CommUtil;
-import org.apache.s4.comm.util.JSONUtil;
-import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import org.apache.s4.comm.zk.ZkTaskSetup;
-import org.apache.s4.comm.zk.ZkTaskManager;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-public class TestTaskSetupApp {
-
-    public static void main(String[] args) throws Exception {
-        new TestTaskSetupApp().testTaskSetup1();
-    }
-
-    // test the case
-    public void testTaskSetup1() throws Exception {
-        String address = "effortfell.greatamerica.corp.yahoo.com:2181";
-        Watcher watcher = new Watcher() {
-
-            @Override
-            public void process(WatchedEvent event) {
-
-            }
-
-        };
-        // setup
-        ZooKeeper zk = new ZooKeeper(address, 30000, watcher);
-        String root = "/tasksetup_app_test";
-        ZkTaskSetup zkSetup = new ZkTaskSetup(address, root, ClusterType.S4);
-        Map<String, String> task1 = new HashMap<String, String>();
-        task1.put("name", "task-1");
-
-        Map<String, String> task2 = new HashMap<String, String>();
-        task2.put("name", "task-2");
-        String tasksListRoot = root + "/tasks";
-        zkSetup.cleanUp();
-        Stat exists = zk.exists(tasksListRoot, false);
-        myassert(exists == null);
-        Object[] data = new Object[] { task1, task2 };
-        zkSetup.setUpTasks(data);
-
-        // verify that tasks are created
-        exists = zk.exists(tasksListRoot, false);
-        myassert(exists != null);
-        List<String> children = zk.getChildren(tasksListRoot, false);
-        myassert(children.size() == data.length);
-        boolean[] matched = new boolean[data.length];
-        for (String child : children) {
-            System.out.println(child);
-            String childPath = tasksListRoot + "/" + child;
-            Stat sTemp = zk.exists(childPath, false);
-            byte[] tempData = zk.getData(tasksListRoot + "/" + child,
-                                         false,
-                                         sTemp);
-            Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
-            // check if it matches any of the data
-            for (int i = 0; i < data.length; i++) {
-                Map<String, Object> newData = (Map<String, Object>) data[i];
-                if (!matched[i] && CommUtil.compareMaps(newData, map)) {
-                    matched[i] = true;
-                    break;
-                }
-            }
-        }
-        for (int i = 0; i < matched.length; i++) {
-            myassert(matched[i]);
-        }
-
-        // try running again and make verify new node is not created
-        Stat oldStat = zk.exists(tasksListRoot, false);
-        System.out.println("oldStat=" + oldStat);
-        zkSetup.setUpTasks(data);
-        Stat newStat = zk.exists(tasksListRoot, false);
-        System.out.println("newstat=" + newStat);
-        myassert(oldStat.getMtime() == newStat.getMtime());
-
-        // make change to task config and try running again and verify new
-        // config is uploaded
-        oldStat = zk.exists(tasksListRoot, false);
-        System.out.println("oldStat=" + oldStat.getVersion());
-        ((Map<String, String>) data[data.length - 1]).put("name", "changedname");
-        zkSetup.setUpTasks(data);
-        newStat = zk.exists(tasksListRoot, false);
-        System.out.println("newstat=" + newStat.getVersion());
-        System.out.println();
-        myassert(oldStat.getMtime() != newStat.getMtime());
-
-        // ensure version change is working
-        zkSetup.setUpTasks("1.0.0.0", data);
-    }
-
-    private void myassert(boolean b) {
-        if (!b) {
-            throw new AssertionError();
-        }
-    }
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.test;
+
+import org.apache.s4.comm.util.CommUtil;
+import org.apache.s4.comm.util.JSONUtil;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+import org.apache.s4.comm.zk.ZkTaskSetup;
+import org.apache.s4.comm.zk.ZkTaskManager;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+public class TestTaskSetupApp {
+
+    public static void main(String[] args) throws Exception {
+        new TestTaskSetupApp().testTaskSetup1();
+    }
+
+    // test the case
+    public void testTaskSetup1() throws Exception {
+        String address = "effortfell.greatamerica.corp.yahoo.com:2181";
+        Watcher watcher = new Watcher() {
+
+            @Override
+            public void process(WatchedEvent event) {
+
+            }
+
+        };
+        // setup
+        ZooKeeper zk = new ZooKeeper(address, 30000, watcher);
+        String root = "/tasksetup_app_test";
+        ZkTaskSetup zkSetup = new ZkTaskSetup(address, root, ClusterType.S4);
+        Map<String, String> task1 = new HashMap<String, String>();
+        task1.put("name", "task-1");
+
+        Map<String, String> task2 = new HashMap<String, String>();
+        task2.put("name", "task-2");
+        String tasksListRoot = root + "/tasks";
+        zkSetup.cleanUp();
+        Stat exists = zk.exists(tasksListRoot, false);
+        myassert(exists == null);
+        Object[] data = new Object[] { task1, task2 };
+        zkSetup.setUpTasks(data);
+
+        // verify that tasks are created
+        exists = zk.exists(tasksListRoot, false);
+        myassert(exists != null);
+        List<String> children = zk.getChildren(tasksListRoot, false);
+        myassert(children.size() == data.length);
+        boolean[] matched = new boolean[data.length];
+        for (String child : children) {
+            System.out.println(child);
+            String childPath = tasksListRoot + "/" + child;
+            Stat sTemp = zk.exists(childPath, false);
+            byte[] tempData = zk.getData(tasksListRoot + "/" + child,
+                                         false,
+                                         sTemp);
+            Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
+            // check if it matches any of the data
+            for (int i = 0; i < data.length; i++) {
+                Map<String, Object> newData = (Map<String, Object>) data[i];
+                if (!matched[i] && CommUtil.compareMaps(newData, map)) {
+                    matched[i] = true;
+                    break;
+                }
+            }
+        }
+        for (int i = 0; i < matched.length; i++) {
+            myassert(matched[i]);
+        }
+
+        // try running again and make verify new node is not created
+        Stat oldStat = zk.exists(tasksListRoot, false);
+        System.out.println("oldStat=" + oldStat);
+        zkSetup.setUpTasks(data);
+        Stat newStat = zk.exists(tasksListRoot, false);
+        System.out.println("newstat=" + newStat);
+        myassert(oldStat.getMtime() == newStat.getMtime());
+
+        // make change to task config and try running again and verify new
+        // config is uploaded
+        oldStat = zk.exists(tasksListRoot, false);
+        System.out.println("oldStat=" + oldStat.getVersion());
+        ((Map<String, String>) data[data.length - 1]).put("name", "changedname");
+        zkSetup.setUpTasks(data);
+        newStat = zk.exists(tasksListRoot, false);
+        System.out.println("newstat=" + newStat.getVersion());
+        System.out.println();
+        myassert(oldStat.getMtime() != newStat.getMtime());
+
+        // ensure version change is working
+        zkSetup.setUpTasks("1.0.0.0", data);
+    }
+
+    private void myassert(boolean b) {
+        if (!b) {
+            throw new AssertionError();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java b/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java
index df214c3..3bfcaa0 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java
@@ -1,86 +1,86 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.tools;
-
-import org.apache.s4.comm.util.ConfigUtils;
-import org.apache.s4.comm.util.ConfigParser;
-import org.apache.s4.comm.util.ConfigParser.Cluster;
-import org.apache.s4.comm.util.ConfigParser.Config;
-import org.apache.s4.comm.zk.ZkTaskSetup;
-
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class will set up initial tasks on the zookeeper USAGE: java AppTask
- * [clean] setup config.xml
- * 
- * @author kishoreg
- * 
- */
-public class TaskSetupApp {
-    public static void main(String[] args) {
-        String zkAddress = "";
-        boolean clean = false;
-        boolean setup = false;
-        String setupXml = null;
-        for (int i = 0; i < args.length; i++) {
-            if (i == 0) {
-                zkAddress = args[0];
-            }
-            if (args[i].equals("clean")) {
-                clean = true;
-            } else if (args[i].equals("setup")) {
-                setup = true;
-            } else if (i == args.length - 1) {
-                setupXml = args[i];
-            }
-        }
-        if (setupXml == null || !new File(setupXml).exists()) {
-            printusage("Set up xml: " + setupXml + " does not exist");
-        }
-        if (!setup && !clean) {
-            System.err.println("Invalid usage.");
-            printusage("Must specify at least one of of clean, setup.");
-        }
-        doMain(zkAddress, clean, setup, setupXml);
-    }
-
-    private static void printusage(String message) {
-        System.err.println(message);
-        System.err.println("java TaskSetupApp <zk_address> [clean|setup] setup_config_xml");
-        System.exit(1);
-    }
-
-    private static void doMain(String zkAddress, boolean clean, boolean setup, String setupXml) {
-        ConfigParser parser = new ConfigParser();
-        Config config = parser.parse(setupXml);
-        for (Cluster cluster : config.getClusters()) {
-            processCluster(clean, zkAddress, cluster, config.getVersion());
-        }
-    }
-
-    private static void processCluster(boolean clean, String zkAddress, Cluster cluster, String version) {
-        List<Map<String,String>> clusterInfo = ConfigUtils.readConfig(cluster, cluster.getName(), cluster.getType(), false);
-        ZkTaskSetup zkSetup = new ZkTaskSetup(zkAddress, cluster.getName(), cluster.getType());
-        if (clean) {
-            zkSetup.cleanUp();
-        }
-        
-        zkSetup.setUpTasks(version, clusterInfo.toArray());
-    }
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.tools;
+
+import org.apache.s4.comm.util.ConfigUtils;
+import org.apache.s4.comm.util.ConfigParser;
+import org.apache.s4.comm.util.ConfigParser.Cluster;
+import org.apache.s4.comm.util.ConfigParser.Config;
+import org.apache.s4.comm.zk.ZkTaskSetup;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class will set up initial tasks on the zookeeper USAGE: java AppTask
+ * [clean] setup config.xml
+ * 
+ * @author kishoreg
+ * 
+ */
+public class TaskSetupApp {
+    public static void main(String[] args) {
+        String zkAddress = "";
+        boolean clean = false;
+        boolean setup = false;
+        String setupXml = null;
+        for (int i = 0; i < args.length; i++) {
+            if (i == 0) {
+                zkAddress = args[0];
+            }
+            if (args[i].equals("clean")) {
+                clean = true;
+            } else if (args[i].equals("setup")) {
+                setup = true;
+            } else if (i == args.length - 1) {
+                setupXml = args[i];
+            }
+        }
+        if (setupXml == null || !new File(setupXml).exists()) {
+            printusage("Set up xml: " + setupXml + " does not exist");
+        }
+        if (!setup && !clean) {
+            System.err.println("Invalid usage.");
+            printusage("Must specify at least one of of clean, setup.");
+        }
+        doMain(zkAddress, clean, setup, setupXml);
+    }
+
+    private static void printusage(String message) {
+        System.err.println(message);
+        System.err.println("java TaskSetupApp <zk_address> [clean|setup] setup_config_xml");
+        System.exit(1);
+    }
+
+    private static void doMain(String zkAddress, boolean clean, boolean setup, String setupXml) {
+        ConfigParser parser = new ConfigParser();
+        Config config = parser.parse(setupXml);
+        for (Cluster cluster : config.getClusters()) {
+            processCluster(clean, zkAddress, cluster, config.getVersion());
+        }
+    }
+
+    private static void processCluster(boolean clean, String zkAddress, Cluster cluster, String version) {
+        List<Map<String,String>> clusterInfo = ConfigUtils.readConfig(cluster, cluster.getName(), cluster.getType(), false);
+        ZkTaskSetup zkSetup = new ZkTaskSetup(zkAddress, cluster.getName(), cluster.getType());
+        if (clean) {
+            zkSetup.cleanUp();
+        }
+        
+        zkSetup.setUpTasks(version, clusterInfo.toArray());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java b/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java
index e440f10..98499a1 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java
@@ -1,38 +1,38 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.util;
-
-import java.util.Map;
-
-public class CommUtil {
-
-    public static boolean compareMaps(Map<String, Object> map1, Map<String, Object> map2) {
-        boolean equals = true;
-        if (map1.size() == map2.size()) {
-            for (String key : map1.keySet()) {
-                if (!(map2.containsKey(key) && map1.get(key)
-                                                   .equals(map2.get(key)))) {
-                    equals = false;
-                    break;
-                }
-            }
-        } else {
-            equals = false;
-        }
-        return equals;
-    }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.util;
+
+import java.util.Map;
+
+public class CommUtil {
+
+    public static boolean compareMaps(Map<String, Object> map1, Map<String, Object> map2) {
+        boolean equals = true;
+        if (map1.size() == map2.size()) {
+            for (String key : map1.keySet()) {
+                if (!(map2.containsKey(key) && map1.get(key)
+                                                   .equals(map2.get(key)))) {
+                    equals = false;
+                    break;
+                }
+            }
+        } else {
+            equals = false;
+        }
+        return equals;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java b/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
index 0e968d9..3b5caac 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
@@ -1,39 +1,39 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.util;
-
-public class SystemUtils {
-
-    private SystemUtils() {
-    }
-
-    public static long getPID() {
-        String processName = java.lang.management.ManagementFactory.getRuntimeMXBean()
-                                                                   .getName();
-        return Long.parseLong(processName.split("@")[0]);
-    }
-
-    public static void main(String[] args) {
-        String msg = "My PID is " + SystemUtils.getPID();
-
-        javax.swing.JOptionPane.showConfirmDialog((java.awt.Component) null,
-                                                  msg,
-                                                  "SystemUtils",
-                                                  javax.swing.JOptionPane.DEFAULT_OPTION);
-
-    }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.util;
+
+public class SystemUtils {
+
+    private SystemUtils() {
+    }
+
+    public static long getPID() {
+        String processName = java.lang.management.ManagementFactory.getRuntimeMXBean()
+                                                                   .getName();
+        return Long.parseLong(processName.split("@")[0]);
+    }
+
+    public static void main(String[] args) {
+        String msg = "My PID is " + SystemUtils.getPID();
+
+        javax.swing.JOptionPane.showConfirmDialog((java.awt.Component) null,
+                                                  msg,
+                                                  "SystemUtils",
+                                                  javax.swing.JOptionPane.DEFAULT_OPTION);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
index f5293b8..1588e54 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
@@ -1,64 +1,64 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.zk;
-
-public class ThreadTest {
-    static Object lock = new Object();
-
-    public static void main(String[] args) {
-
-        Thread t1 = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                while (true) {
-                    synchronized (lock) {
-                        System.out.println("In thread T1");
-                        try {
-                            System.out.println("Going to wait");
-                            long start = System.currentTimeMillis();
-                            lock.wait();
-                            long end = System.currentTimeMillis();
-                            System.out.println("Woke up T1 after :"
-                                    + (end - start) / 1000 + "secs");
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
-            }
-        });
-        t1.start();
-        Thread t2 = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                while (true) {
-                    synchronized (lock) {
-                        try {
-                            Thread.sleep(10000);
-                        } catch (InterruptedException e) {
-                            // TODO Auto-generated catch block
-                            e.printStackTrace();
-                        }
-                        System.out.println("In thread T2");
-                        lock.notify();
-                        break;
-                    }
-                }
-            }
-        });
-        t2.start();
-    }
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.zk;
+
+public class ThreadTest {
+    static Object lock = new Object();
+
+    public static void main(String[] args) {
+
+        Thread t1 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (true) {
+                    synchronized (lock) {
+                        System.out.println("In thread T1");
+                        try {
+                            System.out.println("Going to wait");
+                            long start = System.currentTimeMillis();
+                            lock.wait();
+                            long end = System.currentTimeMillis();
+                            System.out.println("Woke up T1 after :"
+                                    + (end - start) / 1000 + "secs");
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+            }
+        });
+        t1.start();
+        Thread t2 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (true) {
+                    synchronized (lock) {
+                        try {
+                            Thread.sleep(10000);
+                        } catch (InterruptedException e) {
+                            // TODO Auto-generated catch block
+                            e.printStackTrace();
+                        }
+                        System.out.println("In thread T2");
+                        lock.notify();
+                        break;
+                    }
+                }
+            }
+        });
+        t2.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
index 0fe7fa4..7434901 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
@@ -1,147 +1,147 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.zk;
-
-import org.apache.s4.comm.core.CommEventCallback;
-import org.apache.s4.comm.core.DefaultWatcher;
-import org.apache.s4.comm.core.ProcessMonitor;
-import org.apache.s4.comm.util.JSONUtil;
-import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-
-public class ZkProcessMonitor extends DefaultWatcher implements Runnable,
-        ProcessMonitor {
-    static Logger logger = Logger.getLogger(ZkProcessMonitor.class);
-    private List<Object> destinationList;
-    private Map<Integer, Object> destinationMap;
-    private String processZNode;
-    private Object lock = new Object();
-    private volatile boolean updateMode = false;
-    private String taskZNode;
-    private int taskCount;
-
-    public ZkProcessMonitor(String address, String clusterName, ClusterType clusterType) {
-        this(address, clusterName, clusterType, null);
-    }
-
-    public ZkProcessMonitor(String address, String ClusterName, ClusterType clusterType,
-            CommEventCallback callbackHandler) {
-        super(address, callbackHandler);
-        String root = "/" + ClusterName + "/" + clusterType.toString();
-        this.taskZNode = root + "/task";
-        this.processZNode = root + "/process";
-        destinationMap = new HashMap<Integer, Object>();
-        destinationList = new ArrayList<Object>();
-    }
-
-    public void monitor() {
-        synchronized (mutex) {
-            readConfig();
-        }
-        new Thread(this).start();
-    }
-
-    private void readConfig() {
-        try {
-            synchronized (lock) {
-                Map<Integer, Object> tempDestinationMap = new HashMap<Integer, Object>();
-                List<Object> tempDestinationList = new ArrayList<Object>();
-                updateMode = true;
-                List<String> tasks = zk.getChildren(taskZNode, false);
-                this.taskCount = tasks.size();
-                List<String> children = zk.getChildren(processZNode, false);
-                for (String name : children) {
-                    Stat stat = zk.exists(processZNode + "/" + name, false);
-                    if (stat != null) {
-                        byte[] data = zk.getData(processZNode + "/" + name,
-                                                 false,
-                                                 stat);
-                        Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(data));
-                        String key = (String) map.get("partition");
-                        if (key != null) {
-                            tempDestinationMap.put(Integer.parseInt(key), map);
-                        }
-                        tempDestinationList.add(map);
-                    }
-                }
-                destinationList.clear();
-                destinationMap.clear();
-                destinationList.addAll(tempDestinationList);
-                destinationMap.putAll(tempDestinationMap);
-                logger.info("Updated Destination List to" + destinationList);
-                logger.info("Updated Destination Map to" + destinationMap);
-            }
-        } catch (KeeperException e) {
-            logger.warn("Ignorable exception if it happens once in a while", e);
-        } catch (InterruptedException e) {
-            logger.error("Interrupted exception cause while reading process znode",
-                         e);
-        } finally {
-            updateMode = false;
-        }
-    }
-
-    public void run() {
-        try {
-            while (true) {
-                synchronized (mutex) {
-                    // set watch
-                    logger.info("Setting watch on " + processZNode);
-                    zk.getChildren(processZNode, true);
-                    readConfig();
-                    mutex.wait();
-                }
-            }
-        } catch (KeeperException e) {
-            logger.warn("KeeperException in ProcessMonitor.run", e);
-        } catch (InterruptedException e) {
-            logger.warn("InterruptedException in ProcessMonitor.run", e);
-        }
-    }
-
-    public List<Object> getDestinationList() {
-        if (updateMode) {
-            synchronized (lock) {
-                return destinationList;
-            }
-        } else {
-            return destinationList;
-        }
-    }
-
-    public Map<Integer, Object> getDestinationMap() {
-        if (updateMode) {
-            synchronized (lock) {
-                return destinationMap;
-            }
-        } else {
-            return destinationMap;
-        }
-    }
-
-    @Override
-    public int getTaskCount() {
-        return taskCount;
-    }
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.CommEventCallback;
+import org.apache.s4.comm.core.DefaultWatcher;
+import org.apache.s4.comm.core.ProcessMonitor;
+import org.apache.s4.comm.util.JSONUtil;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkProcessMonitor extends DefaultWatcher implements Runnable,
+        ProcessMonitor {
+    static Logger logger = Logger.getLogger(ZkProcessMonitor.class);
+    private List<Object> destinationList;
+    private Map<Integer, Object> destinationMap;
+    private String processZNode;
+    private Object lock = new Object();
+    private volatile boolean updateMode = false;
+    private String taskZNode;
+    private int taskCount;
+
+    public ZkProcessMonitor(String address, String clusterName, ClusterType clusterType) {
+        this(address, clusterName, clusterType, null);
+    }
+
+    public ZkProcessMonitor(String address, String ClusterName, ClusterType clusterType,
+            CommEventCallback callbackHandler) {
+        super(address, callbackHandler);
+        String root = "/" + ClusterName + "/" + clusterType.toString();
+        this.taskZNode = root + "/task";
+        this.processZNode = root + "/process";
+        destinationMap = new HashMap<Integer, Object>();
+        destinationList = new ArrayList<Object>();
+    }
+
+    public void monitor() {
+        synchronized (mutex) {
+            readConfig();
+        }
+        new Thread(this).start();
+    }
+
+    private void readConfig() {
+        try {
+            synchronized (lock) {
+                Map<Integer, Object> tempDestinationMap = new HashMap<Integer, Object>();
+                List<Object> tempDestinationList = new ArrayList<Object>();
+                updateMode = true;
+                List<String> tasks = zk.getChildren(taskZNode, false);
+                this.taskCount = tasks.size();
+                List<String> children = zk.getChildren(processZNode, false);
+                for (String name : children) {
+                    Stat stat = zk.exists(processZNode + "/" + name, false);
+                    if (stat != null) {
+                        byte[] data = zk.getData(processZNode + "/" + name,
+                                                 false,
+                                                 stat);
+                        Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(data));
+                        String key = (String) map.get("partition");
+                        if (key != null) {
+                            tempDestinationMap.put(Integer.parseInt(key), map);
+                        }
+                        tempDestinationList.add(map);
+                    }
+                }
+                destinationList.clear();
+                destinationMap.clear();
+                destinationList.addAll(tempDestinationList);
+                destinationMap.putAll(tempDestinationMap);
+                logger.info("Updated Destination List to" + destinationList);
+                logger.info("Updated Destination Map to" + destinationMap);
+            }
+        } catch (KeeperException e) {
+            logger.warn("Ignorable exception if it happens once in a while", e);
+        } catch (InterruptedException e) {
+            logger.error("Interrupted exception cause while reading process znode",
+                         e);
+        } finally {
+            updateMode = false;
+        }
+    }
+
+    public void run() {
+        try {
+            while (true) {
+                synchronized (mutex) {
+                    // set watch
+                    logger.info("Setting watch on " + processZNode);
+                    zk.getChildren(processZNode, true);
+                    readConfig();
+                    mutex.wait();
+                }
+            }
+        } catch (KeeperException e) {
+            logger.warn("KeeperException in ProcessMonitor.run", e);
+        } catch (InterruptedException e) {
+            logger.warn("InterruptedException in ProcessMonitor.run", e);
+        }
+    }
+
+    public List<Object> getDestinationList() {
+        if (updateMode) {
+            synchronized (lock) {
+                return destinationList;
+            }
+        } else {
+            return destinationList;
+        }
+    }
+
+    public Map<Integer, Object> getDestinationMap() {
+        if (updateMode) {
+            synchronized (lock) {
+                return destinationMap;
+            }
+        } else {
+            return destinationMap;
+        }
+    }
+
+    @Override
+    public int getTaskCount() {
+        return taskCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
index 0b58018..1003baa 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
@@ -1,282 +1,282 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.zk;
-
-import org.apache.s4.comm.core.CommEventCallback;
-import org.apache.s4.comm.core.DefaultWatcher;
-import org.apache.s4.comm.util.CommUtil;
-import org.apache.s4.comm.util.JSONUtil;
-import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-public class ZkTaskSetup extends DefaultWatcher {
-    static Logger logger = Logger.getLogger(ZkTaskSetup.class);
-    String tasksListRoot;
-    String processListRoot;
-
-    public ZkTaskSetup(String address, String clusterName, ClusterType clusterType) {
-        this(address, clusterName, clusterType, null);
-    }
-
-    /**
-     * Constructor of ZkTaskSetup
-     * 
-     * @param address
-     * @param clusterName
-     */
-    public ZkTaskSetup(String address, String clusterName, ClusterType clusterType,
-            CommEventCallback callbackHandler) {
-        super(address, callbackHandler);
-        
-        this.root = "/" + clusterName + "/" + clusterType.toString();
-        this.tasksListRoot = root + "/task";
-        this.processListRoot = root + "/process";
-    }
-
-    public void setUpTasks(Object[] data) {
-        setUpTasks("-1", data);
-    }
-
-    /**
-     * Creates task nodes.
-     * 
-     * @param version
-     * @param data
-     */
-    public void setUpTasks(String version, Object[] data) {
-        try {
-            logger.info("Trying to set up configuration with new version:"
-                    + version);
-            if (!version.equals("-1")) {
-                if (!isConfigVersionNewer(version)) {
-                    logger.info("Config version not newer than current version");
-                    return;
-                } else {
-                    cleanUp();
-                }
-            } else {
-                logger.info("Not checking version number since it is set to -1");
-            }
-
-            // check if config data newer
-            if (!isConfigDataNewer(data)) {
-                logger.info("Config data not newer than current version");
-                return;
-            } else {
-                logger.info("Found newer Config data. Cleaning old data");
-                cleanUp();
-            }
-
-            // Create ZK node name
-            if (zk != null) {
-                Stat s;
-                s = zk.exists(root, false);
-                if (s == null) {
-                    String parent = new File(root).getParent()
-                                                  .replace(File.separatorChar,
-                                                           '/');
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("parent:" + parent);
-                    }
-                    Stat exists = zk.exists(parent, false);
-                    if (exists == null) {
-                        zk.create(parent,
-                                  new byte[0],
-                                  Ids.OPEN_ACL_UNSAFE,
-                                  CreateMode.PERSISTENT);
-                    }
-                    zk.create(root,
-                              new byte[0],
-                              Ids.OPEN_ACL_UNSAFE,
-                              CreateMode.PERSISTENT);
-                }
-            }
-            Stat s;
-            s = zk.exists(tasksListRoot, false);
-            if (s == null) {
-                Map<String, String> map = new HashMap<String, String>();
-                map.put("config.version", version);
-                String jsonString = JSONUtil.toJsonString(map);
-                zk.create(tasksListRoot,
-                          jsonString.getBytes(),
-                          Ids.OPEN_ACL_UNSAFE,
-                          CreateMode.PERSISTENT);
-            }
-            s = zk.exists(processListRoot, false);
-            if (s == null) {
-                zk.create(processListRoot,
-                          new byte[0],
-                          Ids.OPEN_ACL_UNSAFE,
-                          CreateMode.PERSISTENT);
-
-            }
-
-            for (int i = 0; i < data.length; i++) {
-                String nodeName = tasksListRoot + "/" + "task" + "-" + i;
-                Stat sTask = zk.exists(nodeName, false);
-                if (sTask == null) {
-                    logger.info("Creating taskNode: " + nodeName);
-                    byte[] byteBuffer = JSONUtil.toJsonString(data[i])
-                                                .getBytes();
-                    zk.create(nodeName,
-                              byteBuffer,
-                              Ids.OPEN_ACL_UNSAFE,
-                              CreateMode.PERSISTENT);
-                } else {
-                    logger.warn("TaskNode already exisits: " + nodeName);
-                }
-            }
-        } catch (Exception e) {
-            logger.error("Keeper exception when creating task nodes: "
-                                 + e.toString(),
-                         e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    private boolean isConfigDataNewer(Object[] data) {
-        try {
-            Stat s;
-            s = zk.exists(tasksListRoot, false);
-            if (s != null) {
-                List<String> children = zk.getChildren(tasksListRoot, false);
-                if (children.size() != data.length) {
-                    return true;
-                }
-                boolean[] matched = new boolean[data.length];
-                for (String child : children) {
-                    String childPath = tasksListRoot + "/" + child;
-                    Stat sTemp = zk.exists(childPath, false);
-                    byte[] tempData = zk.getData(tasksListRoot + "/" + child,
-                                                 false,
-                                                 sTemp);
-                    Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
-
-                    // check if it matches any of the data
-                    for (int i = 0; i < data.length; i++) {
-                        Map<String, Object> newData = (Map<String, Object>) data[i];
-                        if (!matched[i] && CommUtil.compareMaps(newData, map)) {
-                            matched[i] = true;
-                            break;
-                        }
-                    }
-                }
-                for (int i = 0; i < matched.length; i++) {
-                    if (!matched[i]) {
-                        return true;
-                    }
-                }
-            } else {
-                return true;
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(" Exception in isConfigDataNewer method ",
-                                       e);
-        }
-        return false;
-    }
-
-    private boolean isConfigVersionNewer(String version) throws Exception {
-        Stat s;
-        s = zk.exists(tasksListRoot, false);
-        if (s != null) {
-            byte[] data = zk.getData(tasksListRoot, false, s);
-            if (data != null && data.length > 0) {
-                String jsonString = new String(data);
-                if (jsonString != null) {
-                    Map<String, Object> map = JSONUtil.getMapFromJson(jsonString);
-                    if (map.containsKey("config.version")) {
-                        boolean update = false;
-                        String currentVersion = map.get("config.version")
-                                                   .toString();
-                        logger.info("Current config version:" + currentVersion);
-                        String[] curV = currentVersion.split("\\.");
-                        String[] newV = version.split("\\.");
-                        for (int i = 0; i < Math.max(curV.length, newV.length); i++) {
-                            if (Integer.parseInt(newV[i]) > Integer.parseInt(curV[i])) {
-                                update = true;
-                                break;
-                            }
-                        }
-                        if (!update) {
-                            logger.info("Current config version is newer. Config will not be updated");
-                        }
-                        return update;
-                    }
-                } else {
-                    logger.info("No data at znode " + tasksListRoot
-                            + " so version checking will not be done");
-                }
-            } else {
-                logger.info("No data at znode " + tasksListRoot
-                        + " so version checking will not be done");
-            }
-        } else {
-            logger.info("znode " + tasksListRoot
-                    + " does not exist, so creating new one is fine");
-        }
-        return true;
-    }
-
-    /**
-     * Will clean up taskList Node and process List Node
-     */
-    public boolean cleanUp() {
-        try {
-            logger.info("Cleaning :" + tasksListRoot);
-            Stat exists = zk.exists(tasksListRoot, false);
-            if (exists != null) {
-                List<String> children = zk.getChildren(tasksListRoot, false);
-                if (children.size() > 0) {
-                    for (String child : children) {
-                        logger.info("Cleaning :" + tasksListRoot + "/" + child);
-                        zk.delete(tasksListRoot + "/" + child, 0);
-                    }
-                }
-                zk.delete(tasksListRoot, 0);
-            }
-
-            exists = zk.exists(processListRoot, false);
-            if (exists != null) {
-                List<String> children = zk.getChildren(processListRoot, false);
-                if (children.size() > 0) {
-                    logger.warn("Some processes are already running. Cleaning them up. Might result in unpredictable behavior");
-                    for (String child : children) {
-                        logger.info("Cleaning :" + processListRoot + "/"
-                                + child);
-                        zk.delete(processListRoot + "/" + child, 0);
-                    }
-                }
-                logger.info("Finished cleaning :" + processListRoot);
-                zk.delete(processListRoot, 0);
-            }
-            return true;
-        } catch (Exception e) {
-            logger.error("Exception while cleaning up: " + e.getMessage(), e);
-            return false;
-        }
-    }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.CommEventCallback;
+import org.apache.s4.comm.core.DefaultWatcher;
+import org.apache.s4.comm.util.CommUtil;
+import org.apache.s4.comm.util.JSONUtil;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkTaskSetup extends DefaultWatcher {
+    static Logger logger = Logger.getLogger(ZkTaskSetup.class);
+    String tasksListRoot;
+    String processListRoot;
+
+    public ZkTaskSetup(String address, String clusterName, ClusterType clusterType) {
+        this(address, clusterName, clusterType, null);
+    }
+
+    /**
+     * Constructor of ZkTaskSetup
+     * 
+     * @param address
+     * @param clusterName
+     */
+    public ZkTaskSetup(String address, String clusterName, ClusterType clusterType,
+            CommEventCallback callbackHandler) {
+        super(address, callbackHandler);
+        
+        this.root = "/" + clusterName + "/" + clusterType.toString();
+        this.tasksListRoot = root + "/task";
+        this.processListRoot = root + "/process";
+    }
+
+    public void setUpTasks(Object[] data) {
+        setUpTasks("-1", data);
+    }
+
+    /**
+     * Creates task nodes.
+     * 
+     * @param version
+     * @param data
+     */
+    public void setUpTasks(String version, Object[] data) {
+        try {
+            logger.info("Trying to set up configuration with new version:"
+                    + version);
+            if (!version.equals("-1")) {
+                if (!isConfigVersionNewer(version)) {
+                    logger.info("Config version not newer than current version");
+                    return;
+                } else {
+                    cleanUp();
+                }
+            } else {
+                logger.info("Not checking version number since it is set to -1");
+            }
+
+            // check if config data newer
+            if (!isConfigDataNewer(data)) {
+                logger.info("Config data not newer than current version");
+                return;
+            } else {
+                logger.info("Found newer Config data. Cleaning old data");
+                cleanUp();
+            }
+
+            // Create ZK node name
+            if (zk != null) {
+                Stat s;
+                s = zk.exists(root, false);
+                if (s == null) {
+                    String parent = new File(root).getParent()
+                                                  .replace(File.separatorChar,
+                                                           '/');
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("parent:" + parent);
+                    }
+                    Stat exists = zk.exists(parent, false);
+                    if (exists == null) {
+                        zk.create(parent,
+                                  new byte[0],
+                                  Ids.OPEN_ACL_UNSAFE,
+                                  CreateMode.PERSISTENT);
+                    }
+                    zk.create(root,
+                              new byte[0],
+                              Ids.OPEN_ACL_UNSAFE,
+                              CreateMode.PERSISTENT);
+                }
+            }
+            Stat s;
+            s = zk.exists(tasksListRoot, false);
+            if (s == null) {
+                Map<String, String> map = new HashMap<String, String>();
+                map.put("config.version", version);
+                String jsonString = JSONUtil.toJsonString(map);
+                zk.create(tasksListRoot,
+                          jsonString.getBytes(),
+                          Ids.OPEN_ACL_UNSAFE,
+                          CreateMode.PERSISTENT);
+            }
+            s = zk.exists(processListRoot, false);
+            if (s == null) {
+                zk.create(processListRoot,
+                          new byte[0],
+                          Ids.OPEN_ACL_UNSAFE,
+                          CreateMode.PERSISTENT);
+
+            }
+
+            for (int i = 0; i < data.length; i++) {
+                String nodeName = tasksListRoot + "/" + "task" + "-" + i;
+                Stat sTask = zk.exists(nodeName, false);
+                if (sTask == null) {
+                    logger.info("Creating taskNode: " + nodeName);
+                    byte[] byteBuffer = JSONUtil.toJsonString(data[i])
+                                                .getBytes();
+                    zk.create(nodeName,
+                              byteBuffer,
+                              Ids.OPEN_ACL_UNSAFE,
+                              CreateMode.PERSISTENT);
+                } else {
+                    logger.warn("TaskNode already exisits: " + nodeName);
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Keeper exception when creating task nodes: "
+                                 + e.toString(),
+                         e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private boolean isConfigDataNewer(Object[] data) {
+        try {
+            Stat s;
+            s = zk.exists(tasksListRoot, false);
+            if (s != null) {
+                List<String> children = zk.getChildren(tasksListRoot, false);
+                if (children.size() != data.length) {
+                    return true;
+                }
+                boolean[] matched = new boolean[data.length];
+                for (String child : children) {
+                    String childPath = tasksListRoot + "/" + child;
+                    Stat sTemp = zk.exists(childPath, false);
+                    byte[] tempData = zk.getData(tasksListRoot + "/" + child,
+                                                 false,
+                                                 sTemp);
+                    Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
+
+                    // check if it matches any of the data
+                    for (int i = 0; i < data.length; i++) {
+                        Map<String, Object> newData = (Map<String, Object>) data[i];
+                        if (!matched[i] && CommUtil.compareMaps(newData, map)) {
+                            matched[i] = true;
+                            break;
+                        }
+                    }
+                }
+                for (int i = 0; i < matched.length; i++) {
+                    if (!matched[i]) {
+                        return true;
+                    }
+                }
+            } else {
+                return true;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(" Exception in isConfigDataNewer method ",
+                                       e);
+        }
+        return false;
+    }
+
+    private boolean isConfigVersionNewer(String version) throws Exception {
+        Stat s;
+        s = zk.exists(tasksListRoot, false);
+        if (s != null) {
+            byte[] data = zk.getData(tasksListRoot, false, s);
+            if (data != null && data.length > 0) {
+                String jsonString = new String(data);
+                if (jsonString != null) {
+                    Map<String, Object> map = JSONUtil.getMapFromJson(jsonString);
+                    if (map.containsKey("config.version")) {
+                        boolean update = false;
+                        String currentVersion = map.get("config.version")
+                                                   .toString();
+                        logger.info("Current config version:" + currentVersion);
+                        String[] curV = currentVersion.split("\\.");
+                        String[] newV = version.split("\\.");
+                        for (int i = 0; i < Math.max(curV.length, newV.length); i++) {
+                            if (Integer.parseInt(newV[i]) > Integer.parseInt(curV[i])) {
+                                update = true;
+                                break;
+                            }
+                        }
+                        if (!update) {
+                            logger.info("Current config version is newer. Config will not be updated");
+                        }
+                        return update;
+                    }
+                } else {
+                    logger.info("No data at znode " + tasksListRoot
+                            + " so version checking will not be done");
+                }
+            } else {
+                logger.info("No data at znode " + tasksListRoot
+                        + " so version checking will not be done");
+            }
+        } else {
+            logger.info("znode " + tasksListRoot
+                    + " does not exist, so creating new one is fine");
+        }
+        return true;
+    }
+
+    /**
+     * Will clean up taskList Node and process List Node
+     */
+    public boolean cleanUp() {
+        try {
+            logger.info("Cleaning :" + tasksListRoot);
+            Stat exists = zk.exists(tasksListRoot, false);
+            if (exists != null) {
+                List<String> children = zk.getChildren(tasksListRoot, false);
+                if (children.size() > 0) {
+                    for (String child : children) {
+                        logger.info("Cleaning :" + tasksListRoot + "/" + child);
+                        zk.delete(tasksListRoot + "/" + child, 0);
+                    }
+                }
+                zk.delete(tasksListRoot, 0);
+            }
+
+            exists = zk.exists(processListRoot, false);
+            if (exists != null) {
+                List<String> children = zk.getChildren(processListRoot, false);
+                if (children.size() > 0) {
+                    logger.warn("Some processes are already running. Cleaning them up. Might result in unpredictable behavior");
+                    for (String child : children) {
+                        logger.info("Cleaning :" + processListRoot + "/"
+                                + child);
+                        zk.delete(processListRoot + "/" + child, 0);
+                    }
+                }
+                logger.info("Finished cleaning :" + processListRoot);
+                zk.delete(processListRoot, 0);
+            }
+            return true;
+        } catch (Exception e) {
+            logger.error("Exception while cleaning up: " + e.getMessage(), e);
+            return false;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
index c743db8..1ee2ccb 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
@@ -1,144 +1,144 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm.zk;
-
-import org.apache.s4.comm.core.DefaultWatcher;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.ACL;
-
-public class ZkUtil extends DefaultWatcher {
-
-    public ZkUtil(String address) {
-        super(address);
-
-    }
-
-    public int getChildCount(String path) {
-        try {
-            List<String> children = zk.getChildren(path, false);
-            return children.size();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public List<String> getChildren(String path) {
-        try {
-            List<String> children = zk.getChildren(path, false);
-            return children;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public byte[] getData(String path) {
-        try {
-            byte[] data = zk.getData(path, false, null);
-            return data;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    public void create(String path) {
-        create(path, "");
-    }
-
-    public void create(String path, String data) {
-        try {
-            zk.create(path,
-                      data.getBytes(),
-                      Ids.OPEN_ACL_UNSAFE,
-                      CreateMode.PERSISTENT);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    public void deleteRecursive(String path) {
-        List<String> children = getChildren(path);
-        for (String child : children) {
-            deleteRecursive(path + "/" + child);
-        }
-        delete(path);
-    }
-
-    public void delete(String path) {
-        try {
-            zk.delete(path, -1);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        if (args.length == 0) {
-            printUsage();
-
-        }
-        String address = args[0];
-        String methodName = args[1];
-
-        String[] methodArgs = new String[args.length - 2];
-        for (int i = 2; i < args.length; i++) {
-            methodArgs[i - 2] = args[i];
-        }
-        Method[] methods = ZkUtil.class.getMethods();
-        Method method = null;
-        for (Method met : methods) {
-            if (met.getName().equals(methodName)
-                    && met.getParameterTypes().length == methodArgs.length) {
-                method = met;
-                break;
-            }
-        }
-
-        if (method != null) {
-            ZkUtil zkUtil = new ZkUtil(address);
-            Object ret = method.invoke(zkUtil, (Object[]) methodArgs);
-            if (ret != null) {
-                System.out.println("**********");
-                System.out.println(ret);
-                System.out.println("**********");
-            }
-        } else {
-            printUsage();
-        }
-        // zkUtil.deleteRecursive("/s4/listener/process/task-0");
-        // zkUtil.create("/s4_apps_test/sender/process");
-    }
-
-    private static void printUsage() {
-        System.out.println("USAGE");
-        System.out.println("java <zkadress> methodName arguments");
-        Method[] methods = ZkUtil.class.getMethods();
-        for (Method met : methods) {
-            System.out.println(met.getName() + ":"
-                    + Arrays.toString(met.getParameterTypes()));
-        }
-        System.exit(1);
-    }
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.DefaultWatcher;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+
+public class ZkUtil extends DefaultWatcher {
+
+    public ZkUtil(String address) {
+        super(address);
+
+    }
+
+    public int getChildCount(String path) {
+        try {
+            List<String> children = zk.getChildren(path, false);
+            return children.size();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public List<String> getChildren(String path) {
+        try {
+            List<String> children = zk.getChildren(path, false);
+            return children;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public byte[] getData(String path) {
+        try {
+            byte[] data = zk.getData(path, false, null);
+            return data;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public void create(String path) {
+        create(path, "");
+    }
+
+    public void create(String path, String data) {
+        try {
+            zk.create(path,
+                      data.getBytes(),
+                      Ids.OPEN_ACL_UNSAFE,
+                      CreateMode.PERSISTENT);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public void deleteRecursive(String path) {
+        List<String> children = getChildren(path);
+        for (String child : children) {
+            deleteRecursive(path + "/" + child);
+        }
+        delete(path);
+    }
+
+    public void delete(String path) {
+        try {
+            zk.delete(path, -1);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        if (args.length == 0) {
+            printUsage();
+
+        }
+        String address = args[0];
+        String methodName = args[1];
+
+        String[] methodArgs = new String[args.length - 2];
+        for (int i = 2; i < args.length; i++) {
+            methodArgs[i - 2] = args[i];
+        }
+        Method[] methods = ZkUtil.class.getMethods();
+        Method method = null;
+        for (Method met : methods) {
+            if (met.getName().equals(methodName)
+                    && met.getParameterTypes().length == methodArgs.length) {
+                method = met;
+                break;
+            }
+        }
+
+        if (method != null) {
+            ZkUtil zkUtil = new ZkUtil(address);
+            Object ret = method.invoke(zkUtil, (Object[]) methodArgs);
+            if (ret != null) {
+                System.out.println("**********");
+                System.out.println(ret);
+                System.out.println("**********");
+            }
+        } else {
+            printUsage();
+        }
+        // zkUtil.deleteRecursive("/s4/listener/process/task-0");
+        // zkUtil.create("/s4_apps_test/sender/process");
+    }
+
+    private static void printUsage() {
+        System.out.println("USAGE");
+        System.out.println("java <zkadress> methodName arguments");
+        Method[] methods = ZkUtil.class.getMethods();
+        for (Method met : methods) {
+            System.out.println(met.getName() + ":"
+                    + Arrays.toString(met.getParameterTypes()));
+        }
+        System.exit(1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/resources/log4j.xml b/s4-comm/src/main/resources/log4j.xml
index 3e3c2ce..7a51b1e 100644
--- a/s4-comm/src/main/resources/log4j.xml
+++ b/s4-comm/src/main/resources/log4j.xml
@@ -1,33 +1,33 @@
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"	>
-	<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
-		<layout class="org.apache.log4j.PatternLayout">
-			<param name="ConversionPattern" value="[%d{dd/MM/yy hh:mm:ss:sss z}] %5p %c{2}: %m%n" />
-		</layout>
-	</appender>
-
-	<appender name="ASYNC" class="org.apache.log4j.AsyncAppender">
-		<appender-ref ref="CONSOLE" />
-		<appender-ref ref="FILE" />
-	</appender>
-
-	<appender name="FILE" class="org.apache.log4j.RollingFileAppender">
-		<param name="File" value="comm.log" />
-		<param name="MaxFileSize" value="1MB" />
-		<param name="MaxBackupIndex" value="100" />
-		<layout class="org.apache.log4j.PatternLayout">
-			<param name="ConversionPattern" value="[%d{dd/MM/yy hh:mm:ss:sss z}] %5p %c{2}: %m%n" />
-		</layout>
-	</appender>
-
-	<logger name="zk">
-	  <level value="info"/>
-	  <appender-ref ref="CONSOLE" /> 
-	</logger>
-<!-- 
-	<root>
-		<priority value="debug" />
-		<appender-ref ref="CONSOLE" />
-		<appender-ref ref="ASYNC" />
-	</root>
-	 -->
-</log4j:configuration>
\ No newline at end of file
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"	>
+	<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+		<layout class="org.apache.log4j.PatternLayout">
+			<param name="ConversionPattern" value="[%d{dd/MM/yy hh:mm:ss:sss z}] %5p %c{2}: %m%n" />
+		</layout>
+	</appender>
+
+	<appender name="ASYNC" class="org.apache.log4j.AsyncAppender">
+		<appender-ref ref="CONSOLE" />
+		<appender-ref ref="FILE" />
+	</appender>
+
+	<appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+		<param name="File" value="comm.log" />
+		<param name="MaxFileSize" value="1MB" />
+		<param name="MaxBackupIndex" value="100" />
+		<layout class="org.apache.log4j.PatternLayout">
+			<param name="ConversionPattern" value="[%d{dd/MM/yy hh:mm:ss:sss z}] %5p %c{2}: %m%n" />
+		</layout>
+	</appender>
+
+	<logger name="zk">
+	  <level value="info"/>
+	  <appender-ref ref="CONSOLE" /> 
+	</logger>
+<!-- 
+	<root>
+		<priority value="debug" />
+		<appender-ref ref="CONSOLE" />
+		<appender-ref ref="ASYNC" />
+	</root>
+	 -->
+</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/resources/sample_static_task_manager_test.xml
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/resources/sample_static_task_manager_test.xml b/s4-comm/src/main/resources/sample_static_task_manager_test.xml
index 11912aa..072735a 100644
--- a/s4-comm/src/main/resources/sample_static_task_manager_test.xml
+++ b/s4-comm/src/main/resources/sample_static_task_manager_test.xml
@@ -1,15 +1,15 @@
-<config>
-	<list name="process.list">
-		<item>s4_listener_process</item>	
-	</list>
-	<hash name="s4_listener_process.config">
-		<item name="num.process">2</item>
-		<item name="app.name">s4</item>			
-		<item name="task.type">listener</item>			
-		<item name="mode">unicast</item>
-		<item name="partition">0,1</item>
-		<item name="port">5001,5002</item>
-		<item name="process.host">halfalways-lx</item>
-		<item name="ID">PROCESS_1,PROCESS_2</item>
-	</hash>	
-</config>
\ No newline at end of file
+<config>
+	<list name="process.list">
+		<item>s4_listener_process</item>	
+	</list>
+	<hash name="s4_listener_process.config">
+		<item name="num.process">2</item>
+		<item name="app.name">s4</item>			
+		<item name="task.type">listener</item>			
+		<item name="mode">unicast</item>
+		<item name="partition">0,1</item>
+		<item name="port">5001,5002</item>
+		<item name="process.host">halfalways-lx</item>
+		<item name="ID">PROCESS_1,PROCESS_2</item>
+	</hash>	
+</config>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/resources/sample_task_setup.xml
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/resources/sample_task_setup.xml b/s4-comm/src/main/resources/sample_task_setup.xml
index b518913..bd59c77 100644
--- a/s4-comm/src/main/resources/sample_task_setup.xml
+++ b/s4-comm/src/main/resources/sample_task_setup.xml
@@ -1,23 +1,23 @@
-<config>
-	    
-	<list name="tasks.list">
-		<item>ep_tasks</item>	
-	</list>
-	<!-- This version must be upped for every change -->
-	<scalar name="ep_tasks.version">1.0.0.0</scalar>
-	<scalar name="s4_tasks.version">1.0.0.0</scalar>
-	<hash name="s4_tasks.config">
-		<item name="num.tasks">2</item>
-		<item name="app.name">s4</item>			
-		<item name="task.type">listener</item>			
-		<item name="mode">unicast</item>
-		<item name="partition">0,1</item>
-		<item name="port">5001,5002</item>
-	</hash>	
-	<hash name="ep_tasks.config">
-		<item name="num.tasks">1</item>
-		<item name="app.name">s4_event_pipe</item>			
-		<item name="task.type">sender</item>			
-		<item name="mode">unicast</item>
-	</hash>	
-</config>
\ No newline at end of file
+<config>
+	    
+	<list name="tasks.list">
+		<item>ep_tasks</item>	
+	</list>
+	<!-- This version must be upped for every change -->
+	<scalar name="ep_tasks.version">1.0.0.0</scalar>
+	<scalar name="s4_tasks.version">1.0.0.0</scalar>
+	<hash name="s4_tasks.config">
+		<item name="num.tasks">2</item>
+		<item name="app.name">s4</item>			
+		<item name="task.type">listener</item>			
+		<item name="mode">unicast</item>
+		<item name="partition">0,1</item>
+		<item name="port">5001,5002</item>
+	</hash>	
+	<hash name="ep_tasks.config">
+		<item name="num.tasks">1</item>
+		<item name="app.name">s4_event_pipe</item>			
+		<item name="task.type">sender</item>			
+		<item name="mode">unicast</item>
+	</hash>	
+</config>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/resources/taskManagerTest.xml
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/resources/taskManagerTest.xml b/s4-comm/src/main/resources/taskManagerTest.xml
index 11912aa..072735a 100644
--- a/s4-comm/src/main/resources/taskManagerTest.xml
+++ b/s4-comm/src/main/resources/taskManagerTest.xml
@@ -1,15 +1,15 @@
-<config>
-	<list name="process.list">
-		<item>s4_listener_process</item>	
-	</list>
-	<hash name="s4_listener_process.config">
-		<item name="num.process">2</item>
-		<item name="app.name">s4</item>			
-		<item name="task.type">listener</item>			
-		<item name="mode">unicast</item>
-		<item name="partition">0,1</item>
-		<item name="port">5001,5002</item>
-		<item name="process.host">halfalways-lx</item>
-		<item name="ID">PROCESS_1,PROCESS_2</item>
-	</hash>	
-</config>
\ No newline at end of file
+<config>
+	<list name="process.list">
+		<item>s4_listener_process</item>	
+	</list>
+	<hash name="s4_listener_process.config">
+		<item name="num.process">2</item>
+		<item name="app.name">s4</item>			
+		<item name="task.type">listener</item>			
+		<item name="mode">unicast</item>
+		<item name="partition">0,1</item>
+		<item name="port">5001,5002</item>
+		<item name="process.host">halfalways-lx</item>
+		<item name="ID">PROCESS_1,PROCESS_2</item>
+	</hash>	
+</config>