You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:14 UTC
[9/50] [abbrv] Remove troublesome carriage return
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java b/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java
index 0ea56a2..4405dd7 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java
@@ -1,82 +1,82 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.comm.test;
-
-import org.apache.s4.comm.core.TaskManager;
-import org.apache.s4.comm.file.StaticTaskManager;
-import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import org.apache.s4.comm.zk.ZkTaskSetup;
-import org.apache.s4.comm.zk.ZkTaskManager;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-public class TaskManagerTest {
- public static void main(String[] args) throws Exception {
- // testZkTaskManager(args);
- testStaticTaskManager(args);
- Thread.sleep(10000);
- }
-
- private static void testStaticTaskManager(String[] args) {
- String address = null;
- address = "localhost:2181";
- TaskManager taskManager = new StaticTaskManager(address,
- "taskmanagerTest",
- ClusterType.S4,
- null);
- Map<String, String> customTaskData = new HashMap<String, String>();
- Object acquireTask = taskManager.acquireTask(customTaskData);
- System.out.println("Acuired Task:" + acquireTask);
-
- }
-
- private static void testZkTaskManager(String[] args) {
- System.out.println("Here");
- // "effortfell.greatamerica.corp.yahoo.com:2181"
- String address = args[0];
- address = "localhost:2181";
- String processName = args[1];
- ZkTaskSetup taskSetup = new ZkTaskSetup(address,
- "/taskmanagerTest",
- ClusterType.S4);
- taskSetup.cleanUp();
- taskSetup.setUpTasks("1.0.0.0", new String[] { "task0", "task1" });
- Object obj;
- System.out.println(processName + " Going to Wait for a task");
- HashMap<String, String> map = new HashMap<String, String>();
- ZkTaskManager taskManager = new ZkTaskManager(address,
- "/taskmanagerTest",
- ClusterType.S4);
- obj = taskManager.acquireTask(map);
- System.out.println(processName + "taking up task: " + obj);
- File f = new File("c:/" + obj + ".file");
- f.delete();
- while (true) {
- if (f.exists()) {
- break;
- }
- System.out.println(processName + " processing task: " + obj);
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- System.out.println("Exiting task:" + obj);
- }
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.test;
+
+import org.apache.s4.comm.core.TaskManager;
+import org.apache.s4.comm.file.StaticTaskManager;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+import org.apache.s4.comm.zk.ZkTaskSetup;
+import org.apache.s4.comm.zk.ZkTaskManager;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TaskManagerTest {
+ public static void main(String[] args) throws Exception {
+ // testZkTaskManager(args);
+ testStaticTaskManager(args);
+ Thread.sleep(10000);
+ }
+
+ private static void testStaticTaskManager(String[] args) {
+ String address = null;
+ address = "localhost:2181";
+ TaskManager taskManager = new StaticTaskManager(address,
+ "taskmanagerTest",
+ ClusterType.S4,
+ null);
+ Map<String, String> customTaskData = new HashMap<String, String>();
+ Object acquireTask = taskManager.acquireTask(customTaskData);
+ System.out.println("Acuired Task:" + acquireTask);
+
+ }
+
+ private static void testZkTaskManager(String[] args) {
+ System.out.println("Here");
+ // "effortfell.greatamerica.corp.yahoo.com:2181"
+ String address = args[0];
+ address = "localhost:2181";
+ String processName = args[1];
+ ZkTaskSetup taskSetup = new ZkTaskSetup(address,
+ "/taskmanagerTest",
+ ClusterType.S4);
+ taskSetup.cleanUp();
+ taskSetup.setUpTasks("1.0.0.0", new String[] { "task0", "task1" });
+ Object obj;
+ System.out.println(processName + " Going to Wait for a task");
+ HashMap<String, String> map = new HashMap<String, String>();
+ ZkTaskManager taskManager = new ZkTaskManager(address,
+ "/taskmanagerTest",
+ ClusterType.S4);
+ obj = taskManager.acquireTask(map);
+ System.out.println(processName + "taking up task: " + obj);
+ File f = new File("c:/" + obj + ".file");
+ f.delete();
+ while (true) {
+ if (f.exists()) {
+ break;
+ }
+ System.out.println(processName + " processing task: " + obj);
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ System.out.println("Exiting task:" + obj);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java b/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java
index d0c80c0..9cac542 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java
@@ -1,122 +1,122 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.comm.test;
-
-import org.apache.s4.comm.util.CommUtil;
-import org.apache.s4.comm.util.JSONUtil;
-import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import org.apache.s4.comm.zk.ZkTaskSetup;
-import org.apache.s4.comm.zk.ZkTaskManager;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-public class TestTaskSetupApp {
-
- public static void main(String[] args) throws Exception {
- new TestTaskSetupApp().testTaskSetup1();
- }
-
- // test the case
- public void testTaskSetup1() throws Exception {
- String address = "effortfell.greatamerica.corp.yahoo.com:2181";
- Watcher watcher = new Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
-
- }
-
- };
- // setup
- ZooKeeper zk = new ZooKeeper(address, 30000, watcher);
- String root = "/tasksetup_app_test";
- ZkTaskSetup zkSetup = new ZkTaskSetup(address, root, ClusterType.S4);
- Map<String, String> task1 = new HashMap<String, String>();
- task1.put("name", "task-1");
-
- Map<String, String> task2 = new HashMap<String, String>();
- task2.put("name", "task-2");
- String tasksListRoot = root + "/tasks";
- zkSetup.cleanUp();
- Stat exists = zk.exists(tasksListRoot, false);
- myassert(exists == null);
- Object[] data = new Object[] { task1, task2 };
- zkSetup.setUpTasks(data);
-
- // verify that tasks are created
- exists = zk.exists(tasksListRoot, false);
- myassert(exists != null);
- List<String> children = zk.getChildren(tasksListRoot, false);
- myassert(children.size() == data.length);
- boolean[] matched = new boolean[data.length];
- for (String child : children) {
- System.out.println(child);
- String childPath = tasksListRoot + "/" + child;
- Stat sTemp = zk.exists(childPath, false);
- byte[] tempData = zk.getData(tasksListRoot + "/" + child,
- false,
- sTemp);
- Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
- // check if it matches any of the data
- for (int i = 0; i < data.length; i++) {
- Map<String, Object> newData = (Map<String, Object>) data[i];
- if (!matched[i] && CommUtil.compareMaps(newData, map)) {
- matched[i] = true;
- break;
- }
- }
- }
- for (int i = 0; i < matched.length; i++) {
- myassert(matched[i]);
- }
-
- // try running again and make verify new node is not created
- Stat oldStat = zk.exists(tasksListRoot, false);
- System.out.println("oldStat=" + oldStat);
- zkSetup.setUpTasks(data);
- Stat newStat = zk.exists(tasksListRoot, false);
- System.out.println("newstat=" + newStat);
- myassert(oldStat.getMtime() == newStat.getMtime());
-
- // make change to task config and try running again and verify new
- // config is uploaded
- oldStat = zk.exists(tasksListRoot, false);
- System.out.println("oldStat=" + oldStat.getVersion());
- ((Map<String, String>) data[data.length - 1]).put("name", "changedname");
- zkSetup.setUpTasks(data);
- newStat = zk.exists(tasksListRoot, false);
- System.out.println("newstat=" + newStat.getVersion());
- System.out.println();
- myassert(oldStat.getMtime() != newStat.getMtime());
-
- // ensure version change is working
- zkSetup.setUpTasks("1.0.0.0", data);
- }
-
- private void myassert(boolean b) {
- if (!b) {
- throw new AssertionError();
- }
- }
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.test;
+
+import org.apache.s4.comm.util.CommUtil;
+import org.apache.s4.comm.util.JSONUtil;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+import org.apache.s4.comm.zk.ZkTaskSetup;
+import org.apache.s4.comm.zk.ZkTaskManager;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+public class TestTaskSetupApp {
+
+ public static void main(String[] args) throws Exception {
+ new TestTaskSetupApp().testTaskSetup1();
+ }
+
+ // test the case
+ public void testTaskSetup1() throws Exception {
+ String address = "effortfell.greatamerica.corp.yahoo.com:2181";
+ Watcher watcher = new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+
+ }
+
+ };
+ // setup
+ ZooKeeper zk = new ZooKeeper(address, 30000, watcher);
+ String root = "/tasksetup_app_test";
+ ZkTaskSetup zkSetup = new ZkTaskSetup(address, root, ClusterType.S4);
+ Map<String, String> task1 = new HashMap<String, String>();
+ task1.put("name", "task-1");
+
+ Map<String, String> task2 = new HashMap<String, String>();
+ task2.put("name", "task-2");
+ String tasksListRoot = root + "/tasks";
+ zkSetup.cleanUp();
+ Stat exists = zk.exists(tasksListRoot, false);
+ myassert(exists == null);
+ Object[] data = new Object[] { task1, task2 };
+ zkSetup.setUpTasks(data);
+
+ // verify that tasks are created
+ exists = zk.exists(tasksListRoot, false);
+ myassert(exists != null);
+ List<String> children = zk.getChildren(tasksListRoot, false);
+ myassert(children.size() == data.length);
+ boolean[] matched = new boolean[data.length];
+ for (String child : children) {
+ System.out.println(child);
+ String childPath = tasksListRoot + "/" + child;
+ Stat sTemp = zk.exists(childPath, false);
+ byte[] tempData = zk.getData(tasksListRoot + "/" + child,
+ false,
+ sTemp);
+ Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
+ // check if it matches any of the data
+ for (int i = 0; i < data.length; i++) {
+ Map<String, Object> newData = (Map<String, Object>) data[i];
+ if (!matched[i] && CommUtil.compareMaps(newData, map)) {
+ matched[i] = true;
+ break;
+ }
+ }
+ }
+ for (int i = 0; i < matched.length; i++) {
+ myassert(matched[i]);
+ }
+
+ // try running again and make verify new node is not created
+ Stat oldStat = zk.exists(tasksListRoot, false);
+ System.out.println("oldStat=" + oldStat);
+ zkSetup.setUpTasks(data);
+ Stat newStat = zk.exists(tasksListRoot, false);
+ System.out.println("newstat=" + newStat);
+ myassert(oldStat.getMtime() == newStat.getMtime());
+
+ // make change to task config and try running again and verify new
+ // config is uploaded
+ oldStat = zk.exists(tasksListRoot, false);
+ System.out.println("oldStat=" + oldStat.getVersion());
+ ((Map<String, String>) data[data.length - 1]).put("name", "changedname");
+ zkSetup.setUpTasks(data);
+ newStat = zk.exists(tasksListRoot, false);
+ System.out.println("newstat=" + newStat.getVersion());
+ System.out.println();
+ myassert(oldStat.getMtime() != newStat.getMtime());
+
+ // ensure version change is working
+ zkSetup.setUpTasks("1.0.0.0", data);
+ }
+
+ private void myassert(boolean b) {
+ if (!b) {
+ throw new AssertionError();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java b/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java
index df214c3..3bfcaa0 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java
@@ -1,86 +1,86 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.comm.tools;
-
-import org.apache.s4.comm.util.ConfigUtils;
-import org.apache.s4.comm.util.ConfigParser;
-import org.apache.s4.comm.util.ConfigParser.Cluster;
-import org.apache.s4.comm.util.ConfigParser.Config;
-import org.apache.s4.comm.zk.ZkTaskSetup;
-
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class will set up initial tasks on the zookeeper USAGE: java AppTask
- * [clean] setup config.xml
- *
- * @author kishoreg
- *
- */
-public class TaskSetupApp {
- public static void main(String[] args) {
- String zkAddress = "";
- boolean clean = false;
- boolean setup = false;
- String setupXml = null;
- for (int i = 0; i < args.length; i++) {
- if (i == 0) {
- zkAddress = args[0];
- }
- if (args[i].equals("clean")) {
- clean = true;
- } else if (args[i].equals("setup")) {
- setup = true;
- } else if (i == args.length - 1) {
- setupXml = args[i];
- }
- }
- if (setupXml == null || !new File(setupXml).exists()) {
- printusage("Set up xml: " + setupXml + " does not exist");
- }
- if (!setup && !clean) {
- System.err.println("Invalid usage.");
- printusage("Must specify at least one of of clean, setup.");
- }
- doMain(zkAddress, clean, setup, setupXml);
- }
-
- private static void printusage(String message) {
- System.err.println(message);
- System.err.println("java TaskSetupApp <zk_address> [clean|setup] setup_config_xml");
- System.exit(1);
- }
-
- private static void doMain(String zkAddress, boolean clean, boolean setup, String setupXml) {
- ConfigParser parser = new ConfigParser();
- Config config = parser.parse(setupXml);
- for (Cluster cluster : config.getClusters()) {
- processCluster(clean, zkAddress, cluster, config.getVersion());
- }
- }
-
- private static void processCluster(boolean clean, String zkAddress, Cluster cluster, String version) {
- List<Map<String,String>> clusterInfo = ConfigUtils.readConfig(cluster, cluster.getName(), cluster.getType(), false);
- ZkTaskSetup zkSetup = new ZkTaskSetup(zkAddress, cluster.getName(), cluster.getType());
- if (clean) {
- zkSetup.cleanUp();
- }
-
- zkSetup.setUpTasks(version, clusterInfo.toArray());
- }
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.tools;
+
+import org.apache.s4.comm.util.ConfigUtils;
+import org.apache.s4.comm.util.ConfigParser;
+import org.apache.s4.comm.util.ConfigParser.Cluster;
+import org.apache.s4.comm.util.ConfigParser.Config;
+import org.apache.s4.comm.zk.ZkTaskSetup;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class will set up initial tasks on the zookeeper USAGE: java AppTask
+ * [clean] setup config.xml
+ *
+ * @author kishoreg
+ *
+ */
+public class TaskSetupApp {
+ public static void main(String[] args) {
+ String zkAddress = "";
+ boolean clean = false;
+ boolean setup = false;
+ String setupXml = null;
+ for (int i = 0; i < args.length; i++) {
+ if (i == 0) {
+ zkAddress = args[0];
+ }
+ if (args[i].equals("clean")) {
+ clean = true;
+ } else if (args[i].equals("setup")) {
+ setup = true;
+ } else if (i == args.length - 1) {
+ setupXml = args[i];
+ }
+ }
+ if (setupXml == null || !new File(setupXml).exists()) {
+ printusage("Set up xml: " + setupXml + " does not exist");
+ }
+ if (!setup && !clean) {
+ System.err.println("Invalid usage.");
+ printusage("Must specify at least one of of clean, setup.");
+ }
+ doMain(zkAddress, clean, setup, setupXml);
+ }
+
+ private static void printusage(String message) {
+ System.err.println(message);
+ System.err.println("java TaskSetupApp <zk_address> [clean|setup] setup_config_xml");
+ System.exit(1);
+ }
+
+ private static void doMain(String zkAddress, boolean clean, boolean setup, String setupXml) {
+ ConfigParser parser = new ConfigParser();
+ Config config = parser.parse(setupXml);
+ for (Cluster cluster : config.getClusters()) {
+ processCluster(clean, zkAddress, cluster, config.getVersion());
+ }
+ }
+
+ private static void processCluster(boolean clean, String zkAddress, Cluster cluster, String version) {
+ List<Map<String,String>> clusterInfo = ConfigUtils.readConfig(cluster, cluster.getName(), cluster.getType(), false);
+ ZkTaskSetup zkSetup = new ZkTaskSetup(zkAddress, cluster.getName(), cluster.getType());
+ if (clean) {
+ zkSetup.cleanUp();
+ }
+
+ zkSetup.setUpTasks(version, clusterInfo.toArray());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java b/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java
index e440f10..98499a1 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java
@@ -1,38 +1,38 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.comm.util;
-
-import java.util.Map;
-
-public class CommUtil {
-
- public static boolean compareMaps(Map<String, Object> map1, Map<String, Object> map2) {
- boolean equals = true;
- if (map1.size() == map2.size()) {
- for (String key : map1.keySet()) {
- if (!(map2.containsKey(key) && map1.get(key)
- .equals(map2.get(key)))) {
- equals = false;
- break;
- }
- }
- } else {
- equals = false;
- }
- return equals;
- }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.util;
+
+import java.util.Map;
+
+public class CommUtil {
+
+ public static boolean compareMaps(Map<String, Object> map1, Map<String, Object> map2) {
+ boolean equals = true;
+ if (map1.size() == map2.size()) {
+ for (String key : map1.keySet()) {
+ if (!(map2.containsKey(key) && map1.get(key)
+ .equals(map2.get(key)))) {
+ equals = false;
+ break;
+ }
+ }
+ } else {
+ equals = false;
+ }
+ return equals;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java b/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
index 0e968d9..3b5caac 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
@@ -1,39 +1,39 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.comm.util;
-
-public class SystemUtils {
-
- private SystemUtils() {
- }
-
- public static long getPID() {
- String processName = java.lang.management.ManagementFactory.getRuntimeMXBean()
- .getName();
- return Long.parseLong(processName.split("@")[0]);
- }
-
- public static void main(String[] args) {
- String msg = "My PID is " + SystemUtils.getPID();
-
- javax.swing.JOptionPane.showConfirmDialog((java.awt.Component) null,
- msg,
- "SystemUtils",
- javax.swing.JOptionPane.DEFAULT_OPTION);
-
- }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.util;
+
+public class SystemUtils {
+
+ private SystemUtils() {
+ }
+
+ public static long getPID() {
+ String processName = java.lang.management.ManagementFactory.getRuntimeMXBean()
+ .getName();
+ return Long.parseLong(processName.split("@")[0]);
+ }
+
+ public static void main(String[] args) {
+ String msg = "My PID is " + SystemUtils.getPID();
+
+ javax.swing.JOptionPane.showConfirmDialog((java.awt.Component) null,
+ msg,
+ "SystemUtils",
+ javax.swing.JOptionPane.DEFAULT_OPTION);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
index f5293b8..1588e54 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
@@ -1,64 +1,64 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.comm.zk;
-
-public class ThreadTest {
- static Object lock = new Object();
-
- public static void main(String[] args) {
-
- Thread t1 = new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
- synchronized (lock) {
- System.out.println("In thread T1");
- try {
- System.out.println("Going to wait");
- long start = System.currentTimeMillis();
- lock.wait();
- long end = System.currentTimeMillis();
- System.out.println("Woke up T1 after :"
- + (end - start) / 1000 + "secs");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- });
- t1.start();
- Thread t2 = new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
- synchronized (lock) {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- System.out.println("In thread T2");
- lock.notify();
- break;
- }
- }
- }
- });
- t2.start();
- }
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.zk;
+
+public class ThreadTest {
+ static Object lock = new Object();
+
+ public static void main(String[] args) {
+
+ Thread t1 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ synchronized (lock) {
+ System.out.println("In thread T1");
+ try {
+ System.out.println("Going to wait");
+ long start = System.currentTimeMillis();
+ lock.wait();
+ long end = System.currentTimeMillis();
+ System.out.println("Woke up T1 after :"
+ + (end - start) / 1000 + "secs");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ });
+ t1.start();
+ Thread t2 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ synchronized (lock) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ System.out.println("In thread T2");
+ lock.notify();
+ break;
+ }
+ }
+ }
+ });
+ t2.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
index 0fe7fa4..7434901 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
@@ -1,147 +1,147 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.comm.zk;
-
-import org.apache.s4.comm.core.CommEventCallback;
-import org.apache.s4.comm.core.DefaultWatcher;
-import org.apache.s4.comm.core.ProcessMonitor;
-import org.apache.s4.comm.util.JSONUtil;
-import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-
-public class ZkProcessMonitor extends DefaultWatcher implements Runnable,
- ProcessMonitor {
- static Logger logger = Logger.getLogger(ZkProcessMonitor.class);
- private List<Object> destinationList;
- private Map<Integer, Object> destinationMap;
- private String processZNode;
- private Object lock = new Object();
- private volatile boolean updateMode = false;
- private String taskZNode;
- private int taskCount;
-
- public ZkProcessMonitor(String address, String clusterName, ClusterType clusterType) {
- this(address, clusterName, clusterType, null);
- }
-
- public ZkProcessMonitor(String address, String ClusterName, ClusterType clusterType,
- CommEventCallback callbackHandler) {
- super(address, callbackHandler);
- String root = "/" + ClusterName + "/" + clusterType.toString();
- this.taskZNode = root + "/task";
- this.processZNode = root + "/process";
- destinationMap = new HashMap<Integer, Object>();
- destinationList = new ArrayList<Object>();
- }
-
- public void monitor() {
- synchronized (mutex) {
- readConfig();
- }
- new Thread(this).start();
- }
-
- private void readConfig() {
- try {
- synchronized (lock) {
- Map<Integer, Object> tempDestinationMap = new HashMap<Integer, Object>();
- List<Object> tempDestinationList = new ArrayList<Object>();
- updateMode = true;
- List<String> tasks = zk.getChildren(taskZNode, false);
- this.taskCount = tasks.size();
- List<String> children = zk.getChildren(processZNode, false);
- for (String name : children) {
- Stat stat = zk.exists(processZNode + "/" + name, false);
- if (stat != null) {
- byte[] data = zk.getData(processZNode + "/" + name,
- false,
- stat);
- Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(data));
- String key = (String) map.get("partition");
- if (key != null) {
- tempDestinationMap.put(Integer.parseInt(key), map);
- }
- tempDestinationList.add(map);
- }
- }
- destinationList.clear();
- destinationMap.clear();
- destinationList.addAll(tempDestinationList);
- destinationMap.putAll(tempDestinationMap);
- logger.info("Updated Destination List to" + destinationList);
- logger.info("Updated Destination Map to" + destinationMap);
- }
- } catch (KeeperException e) {
- logger.warn("Ignorable exception if it happens once in a while", e);
- } catch (InterruptedException e) {
- logger.error("Interrupted exception cause while reading process znode",
- e);
- } finally {
- updateMode = false;
- }
- }
-
- public void run() {
- try {
- while (true) {
- synchronized (mutex) {
- // set watch
- logger.info("Setting watch on " + processZNode);
- zk.getChildren(processZNode, true);
- readConfig();
- mutex.wait();
- }
- }
- } catch (KeeperException e) {
- logger.warn("KeeperException in ProcessMonitor.run", e);
- } catch (InterruptedException e) {
- logger.warn("InterruptedException in ProcessMonitor.run", e);
- }
- }
-
- public List<Object> getDestinationList() {
- if (updateMode) {
- synchronized (lock) {
- return destinationList;
- }
- } else {
- return destinationList;
- }
- }
-
- public Map<Integer, Object> getDestinationMap() {
- if (updateMode) {
- synchronized (lock) {
- return destinationMap;
- }
- } else {
- return destinationMap;
- }
- }
-
- @Override
- public int getTaskCount() {
- return taskCount;
- }
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.CommEventCallback;
+import org.apache.s4.comm.core.DefaultWatcher;
+import org.apache.s4.comm.core.ProcessMonitor;
+import org.apache.s4.comm.util.JSONUtil;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkProcessMonitor extends DefaultWatcher implements Runnable,
+ ProcessMonitor {
+ static Logger logger = Logger.getLogger(ZkProcessMonitor.class);
+ private List<Object> destinationList;
+ private Map<Integer, Object> destinationMap;
+ private String processZNode;
+ private Object lock = new Object();
+ private volatile boolean updateMode = false;
+ private String taskZNode;
+ private int taskCount;
+
+ public ZkProcessMonitor(String address, String clusterName, ClusterType clusterType) {
+ this(address, clusterName, clusterType, null);
+ }
+
+ public ZkProcessMonitor(String address, String ClusterName, ClusterType clusterType,
+ CommEventCallback callbackHandler) {
+ super(address, callbackHandler);
+ String root = "/" + ClusterName + "/" + clusterType.toString();
+ this.taskZNode = root + "/task";
+ this.processZNode = root + "/process";
+ destinationMap = new HashMap<Integer, Object>();
+ destinationList = new ArrayList<Object>();
+ }
+
+ public void monitor() {
+ synchronized (mutex) {
+ readConfig();
+ }
+ new Thread(this).start();
+ }
+
+ private void readConfig() {
+ try {
+ synchronized (lock) {
+ Map<Integer, Object> tempDestinationMap = new HashMap<Integer, Object>();
+ List<Object> tempDestinationList = new ArrayList<Object>();
+ updateMode = true;
+ List<String> tasks = zk.getChildren(taskZNode, false);
+ this.taskCount = tasks.size();
+ List<String> children = zk.getChildren(processZNode, false);
+ for (String name : children) {
+ Stat stat = zk.exists(processZNode + "/" + name, false);
+ if (stat != null) {
+ byte[] data = zk.getData(processZNode + "/" + name,
+ false,
+ stat);
+ Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(data));
+ String key = (String) map.get("partition");
+ if (key != null) {
+ tempDestinationMap.put(Integer.parseInt(key), map);
+ }
+ tempDestinationList.add(map);
+ }
+ }
+ destinationList.clear();
+ destinationMap.clear();
+ destinationList.addAll(tempDestinationList);
+ destinationMap.putAll(tempDestinationMap);
+ logger.info("Updated Destination List to" + destinationList);
+ logger.info("Updated Destination Map to" + destinationMap);
+ }
+ } catch (KeeperException e) {
+ logger.warn("Ignorable exception if it happens once in a while", e);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted exception cause while reading process znode",
+ e);
+ } finally {
+ updateMode = false;
+ }
+ }
+
+ public void run() {
+ try {
+ while (true) {
+ synchronized (mutex) {
+ // set watch
+ logger.info("Setting watch on " + processZNode);
+ zk.getChildren(processZNode, true);
+ readConfig();
+ mutex.wait();
+ }
+ }
+ } catch (KeeperException e) {
+ logger.warn("KeeperException in ProcessMonitor.run", e);
+ } catch (InterruptedException e) {
+ logger.warn("InterruptedException in ProcessMonitor.run", e);
+ }
+ }
+
+ public List<Object> getDestinationList() {
+ if (updateMode) {
+ synchronized (lock) {
+ return destinationList;
+ }
+ } else {
+ return destinationList;
+ }
+ }
+
+ public Map<Integer, Object> getDestinationMap() {
+ if (updateMode) {
+ synchronized (lock) {
+ return destinationMap;
+ }
+ } else {
+ return destinationMap;
+ }
+ }
+
+ @Override
+ public int getTaskCount() {
+ return taskCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
index 0b58018..1003baa 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
@@ -1,282 +1,282 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.comm.zk;
-
-import org.apache.s4.comm.core.CommEventCallback;
-import org.apache.s4.comm.core.DefaultWatcher;
-import org.apache.s4.comm.util.CommUtil;
-import org.apache.s4.comm.util.JSONUtil;
-import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-public class ZkTaskSetup extends DefaultWatcher {
- static Logger logger = Logger.getLogger(ZkTaskSetup.class);
- String tasksListRoot;
- String processListRoot;
-
- public ZkTaskSetup(String address, String clusterName, ClusterType clusterType) {
- this(address, clusterName, clusterType, null);
- }
-
- /**
- * Constructor of ZkTaskSetup
- *
- * @param address
- * @param clusterName
- */
- public ZkTaskSetup(String address, String clusterName, ClusterType clusterType,
- CommEventCallback callbackHandler) {
- super(address, callbackHandler);
-
- this.root = "/" + clusterName + "/" + clusterType.toString();
- this.tasksListRoot = root + "/task";
- this.processListRoot = root + "/process";
- }
-
- public void setUpTasks(Object[] data) {
- setUpTasks("-1", data);
- }
-
- /**
- * Creates task nodes.
- *
- * @param version
- * @param data
- */
- public void setUpTasks(String version, Object[] data) {
- try {
- logger.info("Trying to set up configuration with new version:"
- + version);
- if (!version.equals("-1")) {
- if (!isConfigVersionNewer(version)) {
- logger.info("Config version not newer than current version");
- return;
- } else {
- cleanUp();
- }
- } else {
- logger.info("Not checking version number since it is set to -1");
- }
-
- // check if config data newer
- if (!isConfigDataNewer(data)) {
- logger.info("Config data not newer than current version");
- return;
- } else {
- logger.info("Found newer Config data. Cleaning old data");
- cleanUp();
- }
-
- // Create ZK node name
- if (zk != null) {
- Stat s;
- s = zk.exists(root, false);
- if (s == null) {
- String parent = new File(root).getParent()
- .replace(File.separatorChar,
- '/');
- if (logger.isDebugEnabled()) {
- logger.debug("parent:" + parent);
- }
- Stat exists = zk.exists(parent, false);
- if (exists == null) {
- zk.create(parent,
- new byte[0],
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
- zk.create(root,
- new byte[0],
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
- }
- Stat s;
- s = zk.exists(tasksListRoot, false);
- if (s == null) {
- Map<String, String> map = new HashMap<String, String>();
- map.put("config.version", version);
- String jsonString = JSONUtil.toJsonString(map);
- zk.create(tasksListRoot,
- jsonString.getBytes(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
- s = zk.exists(processListRoot, false);
- if (s == null) {
- zk.create(processListRoot,
- new byte[0],
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- }
-
- for (int i = 0; i < data.length; i++) {
- String nodeName = tasksListRoot + "/" + "task" + "-" + i;
- Stat sTask = zk.exists(nodeName, false);
- if (sTask == null) {
- logger.info("Creating taskNode: " + nodeName);
- byte[] byteBuffer = JSONUtil.toJsonString(data[i])
- .getBytes();
- zk.create(nodeName,
- byteBuffer,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } else {
- logger.warn("TaskNode already exisits: " + nodeName);
- }
- }
- } catch (Exception e) {
- logger.error("Keeper exception when creating task nodes: "
- + e.toString(),
- e);
- throw new RuntimeException(e);
- }
- }
-
- private boolean isConfigDataNewer(Object[] data) {
- try {
- Stat s;
- s = zk.exists(tasksListRoot, false);
- if (s != null) {
- List<String> children = zk.getChildren(tasksListRoot, false);
- if (children.size() != data.length) {
- return true;
- }
- boolean[] matched = new boolean[data.length];
- for (String child : children) {
- String childPath = tasksListRoot + "/" + child;
- Stat sTemp = zk.exists(childPath, false);
- byte[] tempData = zk.getData(tasksListRoot + "/" + child,
- false,
- sTemp);
- Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
-
- // check if it matches any of the data
- for (int i = 0; i < data.length; i++) {
- Map<String, Object> newData = (Map<String, Object>) data[i];
- if (!matched[i] && CommUtil.compareMaps(newData, map)) {
- matched[i] = true;
- break;
- }
- }
- }
- for (int i = 0; i < matched.length; i++) {
- if (!matched[i]) {
- return true;
- }
- }
- } else {
- return true;
- }
- } catch (Exception e) {
- throw new RuntimeException(" Exception in isConfigDataNewer method ",
- e);
- }
- return false;
- }
-
- private boolean isConfigVersionNewer(String version) throws Exception {
- Stat s;
- s = zk.exists(tasksListRoot, false);
- if (s != null) {
- byte[] data = zk.getData(tasksListRoot, false, s);
- if (data != null && data.length > 0) {
- String jsonString = new String(data);
- if (jsonString != null) {
- Map<String, Object> map = JSONUtil.getMapFromJson(jsonString);
- if (map.containsKey("config.version")) {
- boolean update = false;
- String currentVersion = map.get("config.version")
- .toString();
- logger.info("Current config version:" + currentVersion);
- String[] curV = currentVersion.split("\\.");
- String[] newV = version.split("\\.");
- for (int i = 0; i < Math.max(curV.length, newV.length); i++) {
- if (Integer.parseInt(newV[i]) > Integer.parseInt(curV[i])) {
- update = true;
- break;
- }
- }
- if (!update) {
- logger.info("Current config version is newer. Config will not be updated");
- }
- return update;
- }
- } else {
- logger.info("No data at znode " + tasksListRoot
- + " so version checking will not be done");
- }
- } else {
- logger.info("No data at znode " + tasksListRoot
- + " so version checking will not be done");
- }
- } else {
- logger.info("znode " + tasksListRoot
- + " does not exist, so creating new one is fine");
- }
- return true;
- }
-
- /**
- * Will clean up taskList Node and process List Node
- */
- public boolean cleanUp() {
- try {
- logger.info("Cleaning :" + tasksListRoot);
- Stat exists = zk.exists(tasksListRoot, false);
- if (exists != null) {
- List<String> children = zk.getChildren(tasksListRoot, false);
- if (children.size() > 0) {
- for (String child : children) {
- logger.info("Cleaning :" + tasksListRoot + "/" + child);
- zk.delete(tasksListRoot + "/" + child, 0);
- }
- }
- zk.delete(tasksListRoot, 0);
- }
-
- exists = zk.exists(processListRoot, false);
- if (exists != null) {
- List<String> children = zk.getChildren(processListRoot, false);
- if (children.size() > 0) {
- logger.warn("Some processes are already running. Cleaning them up. Might result in unpredictable behavior");
- for (String child : children) {
- logger.info("Cleaning :" + processListRoot + "/"
- + child);
- zk.delete(processListRoot + "/" + child, 0);
- }
- }
- logger.info("Finished cleaning :" + processListRoot);
- zk.delete(processListRoot, 0);
- }
- return true;
- } catch (Exception e) {
- logger.error("Exception while cleaning up: " + e.getMessage(), e);
- return false;
- }
- }
-
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.CommEventCallback;
+import org.apache.s4.comm.core.DefaultWatcher;
+import org.apache.s4.comm.util.CommUtil;
+import org.apache.s4.comm.util.JSONUtil;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkTaskSetup extends DefaultWatcher {
+ static Logger logger = Logger.getLogger(ZkTaskSetup.class);
+ String tasksListRoot;
+ String processListRoot;
+
+ public ZkTaskSetup(String address, String clusterName, ClusterType clusterType) {
+ this(address, clusterName, clusterType, null);
+ }
+
+ /**
+ * Constructor of ZkTaskSetup
+ *
+ * @param address
+ * @param clusterName
+ */
+ public ZkTaskSetup(String address, String clusterName, ClusterType clusterType,
+ CommEventCallback callbackHandler) {
+ super(address, callbackHandler);
+
+ this.root = "/" + clusterName + "/" + clusterType.toString();
+ this.tasksListRoot = root + "/task";
+ this.processListRoot = root + "/process";
+ }
+
+ public void setUpTasks(Object[] data) {
+ setUpTasks("-1", data);
+ }
+
+ /**
+ * Creates task nodes.
+ *
+ * @param version
+ * @param data
+ */
+ public void setUpTasks(String version, Object[] data) {
+ try {
+ logger.info("Trying to set up configuration with new version:"
+ + version);
+ if (!version.equals("-1")) {
+ if (!isConfigVersionNewer(version)) {
+ logger.info("Config version not newer than current version");
+ return;
+ } else {
+ cleanUp();
+ }
+ } else {
+ logger.info("Not checking version number since it is set to -1");
+ }
+
+ // check if config data newer
+ if (!isConfigDataNewer(data)) {
+ logger.info("Config data not newer than current version");
+ return;
+ } else {
+ logger.info("Found newer Config data. Cleaning old data");
+ cleanUp();
+ }
+
+ // Create ZK node name
+ if (zk != null) {
+ Stat s;
+ s = zk.exists(root, false);
+ if (s == null) {
+ String parent = new File(root).getParent()
+ .replace(File.separatorChar,
+ '/');
+ if (logger.isDebugEnabled()) {
+ logger.debug("parent:" + parent);
+ }
+ Stat exists = zk.exists(parent, false);
+ if (exists == null) {
+ zk.create(parent,
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ zk.create(root,
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ }
+ Stat s;
+ s = zk.exists(tasksListRoot, false);
+ if (s == null) {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("config.version", version);
+ String jsonString = JSONUtil.toJsonString(map);
+ zk.create(tasksListRoot,
+ jsonString.getBytes(),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ s = zk.exists(processListRoot, false);
+ if (s == null) {
+ zk.create(processListRoot,
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ }
+
+ for (int i = 0; i < data.length; i++) {
+ String nodeName = tasksListRoot + "/" + "task" + "-" + i;
+ Stat sTask = zk.exists(nodeName, false);
+ if (sTask == null) {
+ logger.info("Creating taskNode: " + nodeName);
+ byte[] byteBuffer = JSONUtil.toJsonString(data[i])
+ .getBytes();
+ zk.create(nodeName,
+ byteBuffer,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } else {
+ logger.warn("TaskNode already exisits: " + nodeName);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Keeper exception when creating task nodes: "
+ + e.toString(),
+ e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean isConfigDataNewer(Object[] data) {
+ try {
+ Stat s;
+ s = zk.exists(tasksListRoot, false);
+ if (s != null) {
+ List<String> children = zk.getChildren(tasksListRoot, false);
+ if (children.size() != data.length) {
+ return true;
+ }
+ boolean[] matched = new boolean[data.length];
+ for (String child : children) {
+ String childPath = tasksListRoot + "/" + child;
+ Stat sTemp = zk.exists(childPath, false);
+ byte[] tempData = zk.getData(tasksListRoot + "/" + child,
+ false,
+ sTemp);
+ Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
+
+ // check if it matches any of the data
+ for (int i = 0; i < data.length; i++) {
+ Map<String, Object> newData = (Map<String, Object>) data[i];
+ if (!matched[i] && CommUtil.compareMaps(newData, map)) {
+ matched[i] = true;
+ break;
+ }
+ }
+ }
+ for (int i = 0; i < matched.length; i++) {
+ if (!matched[i]) {
+ return true;
+ }
+ }
+ } else {
+ return true;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(" Exception in isConfigDataNewer method ",
+ e);
+ }
+ return false;
+ }
+
+ private boolean isConfigVersionNewer(String version) throws Exception {
+ Stat s;
+ s = zk.exists(tasksListRoot, false);
+ if (s != null) {
+ byte[] data = zk.getData(tasksListRoot, false, s);
+ if (data != null && data.length > 0) {
+ String jsonString = new String(data);
+ if (jsonString != null) {
+ Map<String, Object> map = JSONUtil.getMapFromJson(jsonString);
+ if (map.containsKey("config.version")) {
+ boolean update = false;
+ String currentVersion = map.get("config.version")
+ .toString();
+ logger.info("Current config version:" + currentVersion);
+ String[] curV = currentVersion.split("\\.");
+ String[] newV = version.split("\\.");
+ for (int i = 0; i < Math.max(curV.length, newV.length); i++) {
+ if (Integer.parseInt(newV[i]) > Integer.parseInt(curV[i])) {
+ update = true;
+ break;
+ }
+ }
+ if (!update) {
+ logger.info("Current config version is newer. Config will not be updated");
+ }
+ return update;
+ }
+ } else {
+ logger.info("No data at znode " + tasksListRoot
+ + " so version checking will not be done");
+ }
+ } else {
+ logger.info("No data at znode " + tasksListRoot
+ + " so version checking will not be done");
+ }
+ } else {
+ logger.info("znode " + tasksListRoot
+ + " does not exist, so creating new one is fine");
+ }
+ return true;
+ }
+
+ /**
+ * Will clean up taskList Node and process List Node
+ */
+ public boolean cleanUp() {
+ try {
+ logger.info("Cleaning :" + tasksListRoot);
+ Stat exists = zk.exists(tasksListRoot, false);
+ if (exists != null) {
+ List<String> children = zk.getChildren(tasksListRoot, false);
+ if (children.size() > 0) {
+ for (String child : children) {
+ logger.info("Cleaning :" + tasksListRoot + "/" + child);
+ zk.delete(tasksListRoot + "/" + child, 0);
+ }
+ }
+ zk.delete(tasksListRoot, 0);
+ }
+
+ exists = zk.exists(processListRoot, false);
+ if (exists != null) {
+ List<String> children = zk.getChildren(processListRoot, false);
+ if (children.size() > 0) {
+ logger.warn("Some processes are already running. Cleaning them up. Might result in unpredictable behavior");
+ for (String child : children) {
+ logger.info("Cleaning :" + processListRoot + "/"
+ + child);
+ zk.delete(processListRoot + "/" + child, 0);
+ }
+ }
+ logger.info("Finished cleaning :" + processListRoot);
+ zk.delete(processListRoot, 0);
+ }
+ return true;
+ } catch (Exception e) {
+ logger.error("Exception while cleaning up: " + e.getMessage(), e);
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
index c743db8..1ee2ccb 100644
--- a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
@@ -1,144 +1,144 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.comm.zk;
-
-import org.apache.s4.comm.core.DefaultWatcher;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.ACL;
-
-public class ZkUtil extends DefaultWatcher {
-
- public ZkUtil(String address) {
- super(address);
-
- }
-
- public int getChildCount(String path) {
- try {
- List<String> children = zk.getChildren(path, false);
- return children.size();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public List<String> getChildren(String path) {
- try {
- List<String> children = zk.getChildren(path, false);
- return children;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public byte[] getData(String path) {
- try {
- byte[] data = zk.getData(path, false, null);
- return data;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
-
- public void create(String path) {
- create(path, "");
- }
-
- public void create(String path, String data) {
- try {
- zk.create(path,
- data.getBytes(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
-
- public void deleteRecursive(String path) {
- List<String> children = getChildren(path);
- for (String child : children) {
- deleteRecursive(path + "/" + child);
- }
- delete(path);
- }
-
- public void delete(String path) {
- try {
- zk.delete(path, -1);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length == 0) {
- printUsage();
-
- }
- String address = args[0];
- String methodName = args[1];
-
- String[] methodArgs = new String[args.length - 2];
- for (int i = 2; i < args.length; i++) {
- methodArgs[i - 2] = args[i];
- }
- Method[] methods = ZkUtil.class.getMethods();
- Method method = null;
- for (Method met : methods) {
- if (met.getName().equals(methodName)
- && met.getParameterTypes().length == methodArgs.length) {
- method = met;
- break;
- }
- }
-
- if (method != null) {
- ZkUtil zkUtil = new ZkUtil(address);
- Object ret = method.invoke(zkUtil, (Object[]) methodArgs);
- if (ret != null) {
- System.out.println("**********");
- System.out.println(ret);
- System.out.println("**********");
- }
- } else {
- printUsage();
- }
- // zkUtil.deleteRecursive("/s4/listener/process/task-0");
- // zkUtil.create("/s4_apps_test/sender/process");
- }
-
- private static void printUsage() {
- System.out.println("USAGE");
- System.out.println("java <zkadress> methodName arguments");
- Method[] methods = ZkUtil.class.getMethods();
- for (Method met : methods) {
- System.out.println(met.getName() + ":"
- + Arrays.toString(met.getParameterTypes()));
- }
- System.exit(1);
- }
-}
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.DefaultWatcher;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+
+public class ZkUtil extends DefaultWatcher {
+
+ public ZkUtil(String address) {
+ super(address);
+
+ }
+
+ public int getChildCount(String path) {
+ try {
+ List<String> children = zk.getChildren(path, false);
+ return children.size();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public List<String> getChildren(String path) {
+ try {
+ List<String> children = zk.getChildren(path, false);
+ return children;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public byte[] getData(String path) {
+ try {
+ byte[] data = zk.getData(path, false, null);
+ return data;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void create(String path) {
+ create(path, "");
+ }
+
+ public void create(String path, String data) {
+ try {
+ zk.create(path,
+ data.getBytes(),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void deleteRecursive(String path) {
+ List<String> children = getChildren(path);
+ for (String child : children) {
+ deleteRecursive(path + "/" + child);
+ }
+ delete(path);
+ }
+
+ public void delete(String path) {
+ try {
+ zk.delete(path, -1);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length == 0) {
+ printUsage();
+
+ }
+ String address = args[0];
+ String methodName = args[1];
+
+ String[] methodArgs = new String[args.length - 2];
+ for (int i = 2; i < args.length; i++) {
+ methodArgs[i - 2] = args[i];
+ }
+ Method[] methods = ZkUtil.class.getMethods();
+ Method method = null;
+ for (Method met : methods) {
+ if (met.getName().equals(methodName)
+ && met.getParameterTypes().length == methodArgs.length) {
+ method = met;
+ break;
+ }
+ }
+
+ if (method != null) {
+ ZkUtil zkUtil = new ZkUtil(address);
+ Object ret = method.invoke(zkUtil, (Object[]) methodArgs);
+ if (ret != null) {
+ System.out.println("**********");
+ System.out.println(ret);
+ System.out.println("**********");
+ }
+ } else {
+ printUsage();
+ }
+ // zkUtil.deleteRecursive("/s4/listener/process/task-0");
+ // zkUtil.create("/s4_apps_test/sender/process");
+ }
+
+ private static void printUsage() {
+ System.out.println("USAGE");
+ System.out.println("java <zkadress> methodName arguments");
+ Method[] methods = ZkUtil.class.getMethods();
+ for (Method met : methods) {
+ System.out.println(met.getName() + ":"
+ + Arrays.toString(met.getParameterTypes()));
+ }
+ System.exit(1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/resources/log4j.xml b/s4-comm/src/main/resources/log4j.xml
index 3e3c2ce..7a51b1e 100644
--- a/s4-comm/src/main/resources/log4j.xml
+++ b/s4-comm/src/main/resources/log4j.xml
@@ -1,33 +1,33 @@
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" >
- <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="[%d{dd/MM/yy hh:mm:ss:sss z}] %5p %c{2}: %m%n" />
- </layout>
- </appender>
-
- <appender name="ASYNC" class="org.apache.log4j.AsyncAppender">
- <appender-ref ref="CONSOLE" />
- <appender-ref ref="FILE" />
- </appender>
-
- <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
- <param name="File" value="comm.log" />
- <param name="MaxFileSize" value="1MB" />
- <param name="MaxBackupIndex" value="100" />
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="[%d{dd/MM/yy hh:mm:ss:sss z}] %5p %c{2}: %m%n" />
- </layout>
- </appender>
-
- <logger name="zk">
- <level value="info"/>
- <appender-ref ref="CONSOLE" />
- </logger>
-<!--
- <root>
- <priority value="debug" />
- <appender-ref ref="CONSOLE" />
- <appender-ref ref="ASYNC" />
- </root>
- -->
-</log4j:configuration>
\ No newline at end of file
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" >
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{dd/MM/yy hh:mm:ss:sss z}] %5p %c{2}: %m%n" />
+ </layout>
+ </appender>
+
+ <appender name="ASYNC" class="org.apache.log4j.AsyncAppender">
+ <appender-ref ref="CONSOLE" />
+ <appender-ref ref="FILE" />
+ </appender>
+
+ <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="comm.log" />
+ <param name="MaxFileSize" value="1MB" />
+ <param name="MaxBackupIndex" value="100" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{dd/MM/yy hh:mm:ss:sss z}] %5p %c{2}: %m%n" />
+ </layout>
+ </appender>
+
+ <logger name="zk">
+ <level value="info"/>
+ <appender-ref ref="CONSOLE" />
+ </logger>
+<!--
+ <root>
+ <priority value="debug" />
+ <appender-ref ref="CONSOLE" />
+ <appender-ref ref="ASYNC" />
+ </root>
+ -->
+</log4j:configuration>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/resources/sample_static_task_manager_test.xml
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/resources/sample_static_task_manager_test.xml b/s4-comm/src/main/resources/sample_static_task_manager_test.xml
index 11912aa..072735a 100644
--- a/s4-comm/src/main/resources/sample_static_task_manager_test.xml
+++ b/s4-comm/src/main/resources/sample_static_task_manager_test.xml
@@ -1,15 +1,15 @@
-<config>
- <list name="process.list">
- <item>s4_listener_process</item>
- </list>
- <hash name="s4_listener_process.config">
- <item name="num.process">2</item>
- <item name="app.name">s4</item>
- <item name="task.type">listener</item>
- <item name="mode">unicast</item>
- <item name="partition">0,1</item>
- <item name="port">5001,5002</item>
- <item name="process.host">halfalways-lx</item>
- <item name="ID">PROCESS_1,PROCESS_2</item>
- </hash>
-</config>
\ No newline at end of file
+<config>
+ <list name="process.list">
+ <item>s4_listener_process</item>
+ </list>
+ <hash name="s4_listener_process.config">
+ <item name="num.process">2</item>
+ <item name="app.name">s4</item>
+ <item name="task.type">listener</item>
+ <item name="mode">unicast</item>
+ <item name="partition">0,1</item>
+ <item name="port">5001,5002</item>
+ <item name="process.host">halfalways-lx</item>
+ <item name="ID">PROCESS_1,PROCESS_2</item>
+ </hash>
+</config>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/resources/sample_task_setup.xml
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/resources/sample_task_setup.xml b/s4-comm/src/main/resources/sample_task_setup.xml
index b518913..bd59c77 100644
--- a/s4-comm/src/main/resources/sample_task_setup.xml
+++ b/s4-comm/src/main/resources/sample_task_setup.xml
@@ -1,23 +1,23 @@
-<config>
-
- <list name="tasks.list">
- <item>ep_tasks</item>
- </list>
- <!-- This version must be upped for every change -->
- <scalar name="ep_tasks.version">1.0.0.0</scalar>
- <scalar name="s4_tasks.version">1.0.0.0</scalar>
- <hash name="s4_tasks.config">
- <item name="num.tasks">2</item>
- <item name="app.name">s4</item>
- <item name="task.type">listener</item>
- <item name="mode">unicast</item>
- <item name="partition">0,1</item>
- <item name="port">5001,5002</item>
- </hash>
- <hash name="ep_tasks.config">
- <item name="num.tasks">1</item>
- <item name="app.name">s4_event_pipe</item>
- <item name="task.type">sender</item>
- <item name="mode">unicast</item>
- </hash>
-</config>
\ No newline at end of file
+<config>
+
+ <list name="tasks.list">
+ <item>ep_tasks</item>
+ </list>
+ <!-- This version must be upped for every change -->
+ <scalar name="ep_tasks.version">1.0.0.0</scalar>
+ <scalar name="s4_tasks.version">1.0.0.0</scalar>
+ <hash name="s4_tasks.config">
+ <item name="num.tasks">2</item>
+ <item name="app.name">s4</item>
+ <item name="task.type">listener</item>
+ <item name="mode">unicast</item>
+ <item name="partition">0,1</item>
+ <item name="port">5001,5002</item>
+ </hash>
+ <hash name="ep_tasks.config">
+ <item name="num.tasks">1</item>
+ <item name="app.name">s4_event_pipe</item>
+ <item name="task.type">sender</item>
+ <item name="mode">unicast</item>
+ </hash>
+</config>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/resources/taskManagerTest.xml
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/resources/taskManagerTest.xml b/s4-comm/src/main/resources/taskManagerTest.xml
index 11912aa..072735a 100644
--- a/s4-comm/src/main/resources/taskManagerTest.xml
+++ b/s4-comm/src/main/resources/taskManagerTest.xml
@@ -1,15 +1,15 @@
-<config>
- <list name="process.list">
- <item>s4_listener_process</item>
- </list>
- <hash name="s4_listener_process.config">
- <item name="num.process">2</item>
- <item name="app.name">s4</item>
- <item name="task.type">listener</item>
- <item name="mode">unicast</item>
- <item name="partition">0,1</item>
- <item name="port">5001,5002</item>
- <item name="process.host">halfalways-lx</item>
- <item name="ID">PROCESS_1,PROCESS_2</item>
- </hash>
-</config>
\ No newline at end of file
+<config>
+ <list name="process.list">
+ <item>s4_listener_process</item>
+ </list>
+ <hash name="s4_listener_process.config">
+ <item name="num.process">2</item>
+ <item name="app.name">s4</item>
+ <item name="task.type">listener</item>
+ <item name="mode">unicast</item>
+ <item name="partition">0,1</item>
+ <item name="port">5001,5002</item>
+ <item name="process.host">halfalways-lx</item>
+ <item name="ID">PROCESS_1,PROCESS_2</item>
+ </hash>
+</config>