You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:15 UTC
[24/50] [abbrv] Rename packages in preparation for move to Apache
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/util/JSONUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/JSONUtil.java b/s4-comm/src/main/java/org/apache/s4/comm/util/JSONUtil.java
new file mode 100644
index 0000000..cf1d56c
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/util/JSONUtil.java
@@ -0,0 +1,285 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.util;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
+import java.util.Iterator;
+import java.util.ArrayList;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class JSONUtil {
+ static Set<Class> knownTypes = new HashSet<Class>();
+ static {
+ knownTypes.add(String.class);
+ knownTypes.add(Double.class);
+ knownTypes.add(Integer.class);
+ knownTypes.add(Float.class);
+ knownTypes.add(Long.class);
+ knownTypes.add(Boolean.class);
+ }
+
+ public static String toJsonString(Object obj) {
+ Map<String, Object> map = getMap(obj);
+ JSONObject jsonObject = toJSONObject(map);
+ return jsonObject.toString();
+ }
+
+ public static Map<String, Object> getMapFromJson(String str) {
+ return getRawRecord(fromJsonString(str));
+ }
+
+ public static Map<String, Object> getRawRecord(JSONObject jsonRecord) {
+ Map<String, Object> record = new HashMap<String, Object>();
+ for (Iterator it = jsonRecord.keys(); it.hasNext();) {
+ try {
+ String key = (String) it.next();
+ Object value = jsonRecord.get(key);
+ record.put(key, fixValue(value));
+ } catch (Exception e) {
+ continue;
+ }
+ }
+ return record;
+ }
+
+ public static List<Map<String, Object>> getRawList(JSONArray jsonList) {
+ List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
+ int length = jsonList.length();
+ for (int i = 0; i < length; i++) {
+ try {
+ Object value = jsonList.get(i);
+ value = fixValue(value);
+ if (!(value instanceof Map)) {
+ Map<String, Object> mapValue = new HashMap<String, Object>();
+ mapValue.put("value", value);
+ value = mapValue;
+ }
+ list.add((Map<String, Object>) value);
+ } catch (Exception e) {
+ continue;
+ }
+ }
+ return list;
+ }
+
+ public static Object fixValue(Object originalValue) {
+ Object value = null;
+ if (originalValue instanceof Float) {
+ value = new Double((Float) originalValue);
+ } else if (originalValue instanceof Integer) {
+ value = new Long((Integer) originalValue);
+ } else if (originalValue instanceof JSONArray) {
+ value = getRawList((JSONArray) originalValue);
+ } else if (originalValue instanceof JSONObject) {
+ value = getRawRecord((JSONObject) originalValue);
+ } else {
+ value = originalValue;
+ }
+ return value;
+ }
+
+ public static JSONObject fromJsonString(String str) {
+ JSONObject object;
+ try {
+ object = new JSONObject(str);
+ } catch (JSONException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ return object;
+ }
+
+ public static JSONObject toJSONObject(Map<String, Object> map) {
+ JSONObject jsonObject = new JSONObject();
+ try {
+ for (String key : map.keySet()) {
+ Object val = map.get(key);
+ if (val instanceof Map) {
+ jsonObject.put(key, toJSONObject((Map<String, Object>) val));
+ } else if (val instanceof List) {
+ jsonObject.put(key, toJSONList((List) val));
+ } else {
+ jsonObject.put(key, val);
+ }
+ }
+ } catch (JSONException je) {
+ je.printStackTrace();
+ return null;
+ }
+ return jsonObject;
+ }
+
+ private static JSONArray toJSONList(List list) {
+ JSONArray arr = new JSONArray();
+ for (Object val : list) {
+ if (val instanceof Map) {
+ arr.put(toJSONObject((Map<String, Object>) val));
+ } else if (val instanceof List) {
+ arr.put(toJSONList((List) val));
+ } else {
+ arr.put(val);
+ }
+ }
+ return arr;
+
+ }
+
+ public static Map<String, Object> getMap(Object obj) {
+ Map<String, Object> map = new HashMap<String, Object>();
+ if (obj != null) {
+ if (Map.class.isAssignableFrom(obj.getClass())) {
+ return (Map) obj;
+ } else {
+
+ Field[] fields = obj.getClass().getDeclaredFields();
+ for (int i = 0; i < fields.length; i++) {
+ if (!fields[i].isAccessible()) {
+ fields[i].setAccessible(true);
+ }
+ try {
+ String name = fields[i].getName();
+ Object val = fields[i].get(obj);
+ if (!Modifier.isStatic(fields[i].getModifiers())
+ && !Modifier.isTransient(fields[i].getModifiers())) {
+ if (fields[i].getType().isPrimitive()
+ || knownTypes.contains(fields[i].getType())) {
+ map.put(name, val);
+ } else if (fields[i].getType().isArray()) {
+ int length = Array.getLength(val);
+ Object vals[] = new Object[length];
+ for (int j = 0; j < length; j++) {
+ Object arrVal = Array.get(val, j);
+ if (arrVal.getClass().isPrimitive()
+ || knownTypes.contains(arrVal.getClass())) {
+ vals[j] = arrVal;
+ } else {
+ vals[j] = getMap(arrVal);
+ }
+ }
+ map.put(name, vals);
+ } else {
+ map.put(name, getMap(val));
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Exception while getting value of "
+ + fields[i],
+ e);
+ }
+ }
+ }
+ }
+ return map;
+ }
+
+ public static void main(String[] args) {
+ Map<String, Object> outerMap = new HashMap<String, Object>();
+
+ outerMap.put("doubleValue", 0.3456d);
+ outerMap.put("integerValue", 175647);
+ outerMap.put("longValue", 0x0000005000067000l);
+ outerMap.put("stringValue", "Hello there");
+
+ Map<String, Object> innerMap = null;
+ List<Map<String, Object>> innerList1 = new ArrayList<Map<String, Object>>();
+
+ innerMap = new HashMap<String, Object>();
+ innerMap.put("name", "kishore");
+ innerMap.put("count", 1787265);
+ innerList1.add(innerMap);
+ innerMap = new HashMap<String, Object>();
+ innerMap.put("name", "fred");
+ innerMap.put("count", 11);
+ innerList1.add(innerMap);
+
+ outerMap.put("innerList1", innerList1);
+
+ List<Integer> innerList2 = new ArrayList<Integer>();
+ innerList2.add(65);
+ innerList2.add(2387894);
+ innerList2.add(456);
+
+ outerMap.put("innerList2", innerList2);
+
+ JSONObject jsonObject = toJSONObject(outerMap);
+
+ String flatJSONString = null;
+ try {
+ System.out.println(jsonObject.toString(3));
+ flatJSONString = jsonObject.toString();
+ Object o = jsonObject.get("innerList1");
+ if (!(o instanceof JSONArray)) {
+ System.out.println("Unexpected type of list "
+ + o.getClass().getName());
+ } else {
+ JSONArray jsonArray = (JSONArray) o;
+ o = jsonArray.get(0);
+ if (!(o instanceof JSONObject)) {
+ System.out.println("Unexpected type of map "
+ + o.getClass().getName());
+ } else {
+ JSONObject innerJSONObject = (JSONObject) o;
+ System.out.println(innerJSONObject.get("name"));
+ }
+ }
+ } catch (JSONException je) {
+ je.printStackTrace();
+ }
+
+ if (!flatJSONString.equals(toJsonString(outerMap))) {
+ System.out.println("JSON strings don't match!!");
+ }
+
+ Map<String, Object> map = getMapFromJson(flatJSONString);
+
+ Object o = map.get("doubleValue");
+ if (!(o instanceof Double)) {
+ System.out.println("Expected type Double, got "
+ + o.getClass().getName());
+ Double doubleValue = (Double) o;
+ if (doubleValue != 0.3456d) {
+ System.out.println("Expected 0.3456, got " + doubleValue);
+ }
+ }
+
+ o = map.get("innerList1");
+ if (!(o instanceof List)) {
+ System.out.println("Expected implementation of List, got "
+ + o.getClass().getName());
+ } else {
+ List innerList = (List) o;
+ o = innerList.get(0);
+ if (!(o instanceof Map)) {
+ System.out.println("Expected implementation of Map, got "
+ + o.getClass().getName());
+ } else {
+ innerMap = (Map) o;
+ System.out.println(innerMap.get("name"));
+ }
+ }
+ System.out.println(map);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java b/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
new file mode 100644
index 0000000..0e968d9
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.util;
+
+public class SystemUtils {
+
+ private SystemUtils() {
+ }
+
+ public static long getPID() {
+ String processName = java.lang.management.ManagementFactory.getRuntimeMXBean()
+ .getName();
+ return Long.parseLong(processName.split("@")[0]);
+ }
+
+ public static void main(String[] args) {
+ String msg = "My PID is " + SystemUtils.getPID();
+
+ javax.swing.JOptionPane.showConfirmDialog((java.awt.Component) null,
+ msg,
+ "SystemUtils",
+ javax.swing.JOptionPane.DEFAULT_OPTION);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
new file mode 100644
index 0000000..f5293b8
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.zk;
+
+public class ThreadTest {
+ static Object lock = new Object();
+
+ public static void main(String[] args) {
+
+ Thread t1 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ synchronized (lock) {
+ System.out.println("In thread T1");
+ try {
+ System.out.println("Going to wait");
+ long start = System.currentTimeMillis();
+ lock.wait();
+ long end = System.currentTimeMillis();
+ System.out.println("Woke up T1 after :"
+ + (end - start) / 1000 + "secs");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ });
+ t1.start();
+ Thread t2 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ synchronized (lock) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ System.out.println("In thread T2");
+ lock.notify();
+ break;
+ }
+ }
+ }
+ });
+ t2.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
new file mode 100644
index 0000000..0fe7fa4
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.CommEventCallback;
+import org.apache.s4.comm.core.DefaultWatcher;
+import org.apache.s4.comm.core.ProcessMonitor;
+import org.apache.s4.comm.util.JSONUtil;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkProcessMonitor extends DefaultWatcher implements Runnable,
+ ProcessMonitor {
+ static Logger logger = Logger.getLogger(ZkProcessMonitor.class);
+ private List<Object> destinationList;
+ private Map<Integer, Object> destinationMap;
+ private String processZNode;
+ private Object lock = new Object();
+ private volatile boolean updateMode = false;
+ private String taskZNode;
+ private int taskCount;
+
+ public ZkProcessMonitor(String address, String clusterName, ClusterType clusterType) {
+ this(address, clusterName, clusterType, null);
+ }
+
+ public ZkProcessMonitor(String address, String ClusterName, ClusterType clusterType,
+ CommEventCallback callbackHandler) {
+ super(address, callbackHandler);
+ String root = "/" + ClusterName + "/" + clusterType.toString();
+ this.taskZNode = root + "/task";
+ this.processZNode = root + "/process";
+ destinationMap = new HashMap<Integer, Object>();
+ destinationList = new ArrayList<Object>();
+ }
+
+ public void monitor() {
+ synchronized (mutex) {
+ readConfig();
+ }
+ new Thread(this).start();
+ }
+
+ private void readConfig() {
+ try {
+ synchronized (lock) {
+ Map<Integer, Object> tempDestinationMap = new HashMap<Integer, Object>();
+ List<Object> tempDestinationList = new ArrayList<Object>();
+ updateMode = true;
+ List<String> tasks = zk.getChildren(taskZNode, false);
+ this.taskCount = tasks.size();
+ List<String> children = zk.getChildren(processZNode, false);
+ for (String name : children) {
+ Stat stat = zk.exists(processZNode + "/" + name, false);
+ if (stat != null) {
+ byte[] data = zk.getData(processZNode + "/" + name,
+ false,
+ stat);
+ Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(data));
+ String key = (String) map.get("partition");
+ if (key != null) {
+ tempDestinationMap.put(Integer.parseInt(key), map);
+ }
+ tempDestinationList.add(map);
+ }
+ }
+ destinationList.clear();
+ destinationMap.clear();
+ destinationList.addAll(tempDestinationList);
+ destinationMap.putAll(tempDestinationMap);
+ logger.info("Updated Destination List to" + destinationList);
+ logger.info("Updated Destination Map to" + destinationMap);
+ }
+ } catch (KeeperException e) {
+ logger.warn("Ignorable exception if it happens once in a while", e);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted exception cause while reading process znode",
+ e);
+ } finally {
+ updateMode = false;
+ }
+ }
+
+ public void run() {
+ try {
+ while (true) {
+ synchronized (mutex) {
+ // set watch
+ logger.info("Setting watch on " + processZNode);
+ zk.getChildren(processZNode, true);
+ readConfig();
+ mutex.wait();
+ }
+ }
+ } catch (KeeperException e) {
+ logger.warn("KeeperException in ProcessMonitor.run", e);
+ } catch (InterruptedException e) {
+ logger.warn("InterruptedException in ProcessMonitor.run", e);
+ }
+ }
+
+ public List<Object> getDestinationList() {
+ if (updateMode) {
+ synchronized (lock) {
+ return destinationList;
+ }
+ } else {
+ return destinationList;
+ }
+ }
+
+ public Map<Integer, Object> getDestinationMap() {
+ if (updateMode) {
+ synchronized (lock) {
+ return destinationMap;
+ }
+ } else {
+ return destinationMap;
+ }
+ }
+
+ @Override
+ public int getTaskCount() {
+ return taskCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkQueue.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkQueue.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkQueue.java
new file mode 100644
index 0000000..8bac50b
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkQueue.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.DefaultWatcher;
+import org.apache.s4.comm.util.IOUtil;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkQueue extends DefaultWatcher {
+ /**
+ * Constructor of producer-consumer queue
+ *
+ * @param address
+ * @param name
+ */
+ public ZkQueue(String address, String name) {
+ super(address);
+ this.root = name;
+ // Create ZK node name
+ if (zk != null) {
+ try {
+ Stat s = zk.exists(root, false);
+ if (s == null) {
+ zk.create(root,
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ } catch (KeeperException e) {
+ System.out.println("Keeper exception when instantiating queue: "
+ + e.toString());
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted exception");
+ }
+ }
+ }
+
+ /**
+ * Add element to the queue.
+ *
+ * @param obj element to add
+ * @return true if add successful, false otherwise
+ */
+
+ public boolean produce(Object obj) throws KeeperException,
+ InterruptedException {
+ byte[] value = IOUtil.serializeToBytes(obj);
+ zk.create(root + "/element",
+ value,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL);
+ return true;
+ }
+
+ /**
+ * Remove first element from the queue.
+ *
+ * @return first element from the queue
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public Object consume() throws KeeperException, InterruptedException {
+ Object retvalue = -1;
+ Stat stat = null;
+
+ // Get the first element available
+ while (true) {
+ synchronized (mutex) {
+ List<String> list = zk.getChildren(root, true);
+ if (list.size() == 0) {
+ System.out.println("Going to wait");
+ mutex.wait();
+ } else {
+ Integer min = new Integer(list.get(0).substring(7));
+ String name = list.get(0);
+ for (String s : list) {
+ Integer tempValue = new Integer(s.substring(7));
+ // System.out.println("Temporary value: " + s);
+ if (tempValue < min) {
+ min = tempValue;
+ name = s;
+ }
+ }
+ String zNode = root + "/" + name;
+ System.out.println("Temporary value: " + zNode);
+ byte[] b = zk.getData(zNode, false, stat);
+ zk.delete(zNode, 0);
+ retvalue = IOUtil.deserializeToObject(b);
+ return retvalue;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskManager.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskManager.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskManager.java
new file mode 100644
index 0000000..053571d
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskManager.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.CommEventCallback;
+import org.apache.s4.comm.core.DefaultWatcher;
+import org.apache.s4.comm.core.TaskManager;
+import org.apache.s4.comm.util.JSONUtil;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkTaskManager extends DefaultWatcher implements TaskManager {
+ static Logger logger = Logger.getLogger(ZkTaskManager.class);
+ String tasksListRoot;
+ String processListRoot;
+
+ public ZkTaskManager(String address, String ClusterName, ClusterType clusterType) {
+ this(address, ClusterName, clusterType, null);
+ }
+
+ /**
+ * Constructor of TaskManager
+ *
+ * @param address
+ * @param ClusterName
+ */
+ public ZkTaskManager(String address, String ClusterName, ClusterType clusterType,
+ CommEventCallback callbackHandler) {
+ super(address, callbackHandler);
+ this.root = "/" + ClusterName + "/" + clusterType.toString();
+ this.tasksListRoot = root + "/task";
+ this.processListRoot = root + "/process";
+ }
+
+ /**
+ * This will block the process thread from starting the task, when it is
+ * unblocked it will return the data stored in the task node. This data can
+ * be used by the This call assumes that the tasks are already set up
+ *
+ * @return Object containing data related to the task
+ */
+ @Override
+ public Object acquireTask(Map<String, String> customTaskData) {
+ while (true) {
+ synchronized (mutex) {
+ try {
+ Stat tExists = zk.exists(tasksListRoot, false);
+ if (tExists == null) {
+ logger.error("Tasks znode:" + tasksListRoot
+ + " not setup.Going to wait");
+ tExists = zk.exists(tasksListRoot, true);
+ if (tExists == null) {
+ mutex.wait();
+ }
+ continue;
+ }
+ Stat pExists = zk.exists(processListRoot, false);
+ if (pExists == null) {
+ logger.error("Process root znode:" + processListRoot
+ + " not setup.Going to wait");
+ pExists = zk.exists(processListRoot, true);
+ if (pExists == null) {
+ mutex.wait();
+ }
+ continue;
+ }
+ // setting watch true to tasks node will trigger call back
+ // if there is any change to task node,
+ // this is useful to add additional tasks
+ List<String> tasks = zk.getChildren(tasksListRoot, true);
+ List<String> processes = zk.getChildren(processListRoot,
+ true);
+ if (processes.size() < tasks.size()) {
+ ArrayList<String> tasksAvailable = new ArrayList<String>();
+ for (int i = 0; i < tasks.size(); i++) {
+ tasksAvailable.add("" + i);
+ }
+ if (processes != null) {
+ for (String s : processes) {
+ String taskId = s.split("-")[1];
+ tasksAvailable.remove(taskId);
+ }
+ }
+ // try pick up a random task
+ Random random = new Random();
+ int id = Integer.parseInt(tasksAvailable.get(random.nextInt(tasksAvailable.size())));
+ String pNode = processListRoot + "/" + "task-" + id;
+ String tNode = tasksListRoot + "/" + "task-" + id;
+ Stat pNodeStat = zk.exists(pNode, false);
+ if (pNodeStat == null) {
+ Stat tNodeStat = zk.exists(tNode, false);
+ byte[] bytes = zk.getData(tNode, false, tNodeStat);
+ Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(bytes));
+ // if(!map.containsKey("address")){
+ // map.put("address",
+ // InetAddress.getLocalHost().getHostName());
+ // }
+ if (customTaskData != null) {
+ for (String key : customTaskData.keySet()) {
+ if (!map.containsKey(key)) {
+ map.put(key, customTaskData.get(key));
+ }
+ }
+
+ }
+ map.put("taskSize", "" + tasks.size());
+ map.put("tasksRootNode", tasksListRoot);
+ map.put("processRootNode", processListRoot);
+ String create = zk.create(pNode,
+ JSONUtil.toJsonString(map)
+ .getBytes(),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL);
+ logger.info("Created process Node:" + pNode + " :"
+ + create);
+ return map;
+ }
+ } else {
+ // all the tasks are taken up, will wait for the
+ logger.info("No task available to take up. Going to wait");
+ mutex.wait();
+ }
+ } catch (KeeperException e) {
+ logger.info("Warn:mostly ignorable " + e.getMessage(), e);
+ } catch (InterruptedException e) {
+ logger.info("Warn:mostly ignorable " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
new file mode 100644
index 0000000..0b58018
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
@@ -0,0 +1,282 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.CommEventCallback;
+import org.apache.s4.comm.core.DefaultWatcher;
+import org.apache.s4.comm.util.CommUtil;
+import org.apache.s4.comm.util.JSONUtil;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkTaskSetup extends DefaultWatcher {
+ static Logger logger = Logger.getLogger(ZkTaskSetup.class);
+ String tasksListRoot;
+ String processListRoot;
+
+ public ZkTaskSetup(String address, String clusterName, ClusterType clusterType) {
+ this(address, clusterName, clusterType, null);
+ }
+
+ /**
+ * Constructor of ZkTaskSetup
+ *
+ * @param address
+ * @param clusterName
+ */
+ public ZkTaskSetup(String address, String clusterName, ClusterType clusterType,
+ CommEventCallback callbackHandler) {
+ super(address, callbackHandler);
+
+ this.root = "/" + clusterName + "/" + clusterType.toString();
+ this.tasksListRoot = root + "/task";
+ this.processListRoot = root + "/process";
+ }
+
+ public void setUpTasks(Object[] data) {
+ setUpTasks("-1", data);
+ }
+
+ /**
+ * Creates task nodes.
+ *
+ * @param version
+ * @param data
+ */
+ public void setUpTasks(String version, Object[] data) {
+ try {
+ logger.info("Trying to set up configuration with new version:"
+ + version);
+ if (!version.equals("-1")) {
+ if (!isConfigVersionNewer(version)) {
+ logger.info("Config version not newer than current version");
+ return;
+ } else {
+ cleanUp();
+ }
+ } else {
+ logger.info("Not checking version number since it is set to -1");
+ }
+
+ // check if config data newer
+ if (!isConfigDataNewer(data)) {
+ logger.info("Config data not newer than current version");
+ return;
+ } else {
+ logger.info("Found newer Config data. Cleaning old data");
+ cleanUp();
+ }
+
+ // Create ZK node name
+ if (zk != null) {
+ Stat s;
+ s = zk.exists(root, false);
+ if (s == null) {
+ String parent = new File(root).getParent()
+ .replace(File.separatorChar,
+ '/');
+ if (logger.isDebugEnabled()) {
+ logger.debug("parent:" + parent);
+ }
+ Stat exists = zk.exists(parent, false);
+ if (exists == null) {
+ zk.create(parent,
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ zk.create(root,
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ }
+ Stat s;
+ s = zk.exists(tasksListRoot, false);
+ if (s == null) {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("config.version", version);
+ String jsonString = JSONUtil.toJsonString(map);
+ zk.create(tasksListRoot,
+ jsonString.getBytes(),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ s = zk.exists(processListRoot, false);
+ if (s == null) {
+ zk.create(processListRoot,
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ }
+
+ for (int i = 0; i < data.length; i++) {
+ String nodeName = tasksListRoot + "/" + "task" + "-" + i;
+ Stat sTask = zk.exists(nodeName, false);
+ if (sTask == null) {
+ logger.info("Creating taskNode: " + nodeName);
+ byte[] byteBuffer = JSONUtil.toJsonString(data[i])
+ .getBytes();
+ zk.create(nodeName,
+ byteBuffer,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } else {
+ logger.warn("TaskNode already exisits: " + nodeName);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Keeper exception when creating task nodes: "
+ + e.toString(),
+ e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean isConfigDataNewer(Object[] data) {
+ try {
+ Stat s;
+ s = zk.exists(tasksListRoot, false);
+ if (s != null) {
+ List<String> children = zk.getChildren(tasksListRoot, false);
+ if (children.size() != data.length) {
+ return true;
+ }
+ boolean[] matched = new boolean[data.length];
+ for (String child : children) {
+ String childPath = tasksListRoot + "/" + child;
+ Stat sTemp = zk.exists(childPath, false);
+ byte[] tempData = zk.getData(tasksListRoot + "/" + child,
+ false,
+ sTemp);
+ Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
+
+ // check if it matches any of the data
+ for (int i = 0; i < data.length; i++) {
+ Map<String, Object> newData = (Map<String, Object>) data[i];
+ if (!matched[i] && CommUtil.compareMaps(newData, map)) {
+ matched[i] = true;
+ break;
+ }
+ }
+ }
+ for (int i = 0; i < matched.length; i++) {
+ if (!matched[i]) {
+ return true;
+ }
+ }
+ } else {
+ return true;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(" Exception in isConfigDataNewer method ",
+ e);
+ }
+ return false;
+ }
+
+ private boolean isConfigVersionNewer(String version) throws Exception {
+ Stat s;
+ s = zk.exists(tasksListRoot, false);
+ if (s != null) {
+ byte[] data = zk.getData(tasksListRoot, false, s);
+ if (data != null && data.length > 0) {
+ String jsonString = new String(data);
+ if (jsonString != null) {
+ Map<String, Object> map = JSONUtil.getMapFromJson(jsonString);
+ if (map.containsKey("config.version")) {
+ boolean update = false;
+ String currentVersion = map.get("config.version")
+ .toString();
+ logger.info("Current config version:" + currentVersion);
+ String[] curV = currentVersion.split("\\.");
+ String[] newV = version.split("\\.");
+ for (int i = 0; i < Math.max(curV.length, newV.length); i++) {
+ if (Integer.parseInt(newV[i]) > Integer.parseInt(curV[i])) {
+ update = true;
+ break;
+ }
+ }
+ if (!update) {
+ logger.info("Current config version is newer. Config will not be updated");
+ }
+ return update;
+ }
+ } else {
+ logger.info("No data at znode " + tasksListRoot
+ + " so version checking will not be done");
+ }
+ } else {
+ logger.info("No data at znode " + tasksListRoot
+ + " so version checking will not be done");
+ }
+ } else {
+ logger.info("znode " + tasksListRoot
+ + " does not exist, so creating new one is fine");
+ }
+ return true;
+ }
+
+ /**
+ * Will clean up taskList Node and process List Node
+ */
+ public boolean cleanUp() {
+ try {
+ logger.info("Cleaning :" + tasksListRoot);
+ Stat exists = zk.exists(tasksListRoot, false);
+ if (exists != null) {
+ List<String> children = zk.getChildren(tasksListRoot, false);
+ if (children.size() > 0) {
+ for (String child : children) {
+ logger.info("Cleaning :" + tasksListRoot + "/" + child);
+ zk.delete(tasksListRoot + "/" + child, 0);
+ }
+ }
+ zk.delete(tasksListRoot, 0);
+ }
+
+ exists = zk.exists(processListRoot, false);
+ if (exists != null) {
+ List<String> children = zk.getChildren(processListRoot, false);
+ if (children.size() > 0) {
+ logger.warn("Some processes are already running. Cleaning them up. Might result in unpredictable behavior");
+ for (String child : children) {
+ logger.info("Cleaning :" + processListRoot + "/"
+ + child);
+ zk.delete(processListRoot + "/" + child, 0);
+ }
+ }
+ logger.info("Finished cleaning :" + processListRoot);
+ zk.delete(processListRoot, 0);
+ }
+ return true;
+ } catch (Exception e) {
+ logger.error("Exception while cleaning up: " + e.getMessage(), e);
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
new file mode 100644
index 0000000..c743db8
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.DefaultWatcher;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+
+public class ZkUtil extends DefaultWatcher {
+
+ public ZkUtil(String address) {
+ super(address);
+
+ }
+
+ public int getChildCount(String path) {
+ try {
+ List<String> children = zk.getChildren(path, false);
+ return children.size();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public List<String> getChildren(String path) {
+ try {
+ List<String> children = zk.getChildren(path, false);
+ return children;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public byte[] getData(String path) {
+ try {
+ byte[] data = zk.getData(path, false, null);
+ return data;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void create(String path) {
+ create(path, "");
+ }
+
+ public void create(String path, String data) {
+ try {
+ zk.create(path,
+ data.getBytes(),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void deleteRecursive(String path) {
+ List<String> children = getChildren(path);
+ for (String child : children) {
+ deleteRecursive(path + "/" + child);
+ }
+ delete(path);
+ }
+
+ public void delete(String path) {
+ try {
+ zk.delete(path, -1);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length == 0) {
+ printUsage();
+
+ }
+ String address = args[0];
+ String methodName = args[1];
+
+ String[] methodArgs = new String[args.length - 2];
+ for (int i = 2; i < args.length; i++) {
+ methodArgs[i - 2] = args[i];
+ }
+ Method[] methods = ZkUtil.class.getMethods();
+ Method method = null;
+ for (Method met : methods) {
+ if (met.getName().equals(methodName)
+ && met.getParameterTypes().length == methodArgs.length) {
+ method = met;
+ break;
+ }
+ }
+
+ if (method != null) {
+ ZkUtil zkUtil = new ZkUtil(address);
+ Object ret = method.invoke(zkUtil, (Object[]) methodArgs);
+ if (ret != null) {
+ System.out.println("**********");
+ System.out.println(ret);
+ System.out.println("**********");
+ }
+ } else {
+ printUsage();
+ }
+ // zkUtil.deleteRecursive("/s4/listener/process/task-0");
+ // zkUtil.create("/s4_apps_test/sender/process");
+ }
+
+ private static void printUsage() {
+ System.out.println("USAGE");
+ System.out.println("java <zkadress> methodName arguments");
+ Method[] methods = ZkUtil.class.getMethods();
+ for (Method met : methods) {
+ System.out.println(met.getName() + ":"
+ + Arrays.toString(met.getParameterTypes()));
+ }
+ System.exit(1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/main/java/org/apache/s4/MainApp.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/MainApp.java b/s4-core/src/main/java/org/apache/s4/MainApp.java
index a88e7a5..dedc8d9 100644
--- a/s4-core/src/main/java/org/apache/s4/MainApp.java
+++ b/s4-core/src/main/java/org/apache/s4/MainApp.java
@@ -13,7 +13,7 @@
* language governing permissions and limitations under the
* License. See accompanying LICENSE file.
*/
-package io.s4;
+package org.apache.s4;
import org.apache.s4.ft.SafeKeeper;
import org.apache.s4.processor.AbstractPE;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java b/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
index bfb6eeb..5e4fad2 100644
--- a/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
+++ b/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
@@ -123,7 +123,7 @@ public class ObjectBuilder {
String[] query = { "name", "count", "freq" };
String target[] = { "ACDW", "11" };
- org.apache.s4.message.Request.ClientRInfo rinfo = new io.s4.message.Request.ClientRInfo();
+ org.apache.s4.message.Request.ClientRInfo rinfo = new org.apache.s4.message.Request.ClientRInfo();
rinfo.setRequesterUUID(UUID.randomUUID());
Request req = new org.apache.s4.message.SinglePERequest(Arrays.asList(target),
Arrays.asList(query),
@@ -131,7 +131,7 @@ public class ObjectBuilder {
System.out.println(req.toString());
- InstanceCreator<org.apache.s4.message.Request.RInfo> infoCreator = new InstanceCreator<io.s4.message.Request.RInfo>() {
+ InstanceCreator<org.apache.s4.message.Request.RInfo> infoCreator = new InstanceCreator<org.apache.s4.message.Request.RInfo>() {
public org.apache.s4.message.Request.RInfo createInstance(Type type) {
return new org.apache.s4.message.Request.ClientRInfo();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
index ae2faca..ca4d56f 100644
--- a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
@@ -84,12 +84,12 @@ public class DefaultPartitioner implements Partitioner, VariableKeyPartitioner {
// Some event types that need special handling
if (event instanceof org.apache.s4.message.Request) {
// construct key from request's target
- org.apache.s4.message.Request r = (io.s4.message.Request) event;
+ org.apache.s4.message.Request r = (org.apache.s4.message.Request) event;
return r.partition(hasher, delimiter, partitionCount);
} else if (event instanceof org.apache.s4.message.Response) {
// partition id is encoded in Response, so use it directly.
- org.apache.s4.message.Response r = (io.s4.message.Response) event;
+ org.apache.s4.message.Response r = (org.apache.s4.message.Response) event;
return r.partition(partitionCount);
} else if (compoundKeyNames == null) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java b/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
index ae877fb..0cca3b0 100644
--- a/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
+++ b/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
@@ -119,7 +119,7 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
* (non-Javadoc)
*
* @see
- * org.apache.s4.processor.AsynchronousEventProcessor#queueWork(io.s4.collector.
+ * org.apache.s4.processor.AsynchronousEventProcessor#queueWork(org.apache.s4.collector.
* EventWrapper)
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/CheckpointingTest.java b/s4-core/src/test/java/io/s4/ft/CheckpointingTest.java
deleted file mode 100644
index c52a8b4..0000000
--- a/s4-core/src/test/java/io/s4/ft/CheckpointingTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package io.s4.ft;
-
-import io.s4.serialize.KryoSerDeser;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.concurrent.CountDownLatch;
-
-import junit.framework.Assert;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxn.Factory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.esotericsoftware.reflectasm.FieldAccess;
-
-public class CheckpointingTest extends S4TestCase {
-
- private static Factory zookeeperServerConnectionFactory = null;
- private S4App app;
-
- @Before
- public void prepare() throws Exception {
- zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
- app = new S4App(getClass(), "s4_core_conf_fs_backend.xml");
- app.initializeS4App();
- }
-
- @After
- public void cleanup() throws Exception {
- TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
- if (app!=null) {
- app.destroy();
- }
- }
-
- @Test
- public void testCheckpointStorage() throws Exception {
- final ZooKeeper zk = TestUtils.createZkClient();
-
-
-
- // 2. generate a simple event that creates and changes the state of
- // the
- // PE
-
- // NOTE: coordinate through zookeeper
- final CountDownLatch signalValue1Set = new CountDownLatch(1);
-
- TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
- final CountDownLatch signalCheckpointed = new CountDownLatch(1);
- TestUtils.watchAndSignalCreation("/checkpointed",
- signalCheckpointed, zk);
- EventGenerator gen = new EventGenerator();
- gen.injectValueEvent(new KeyValue("value1", "message1"), "Stream1",
- 0);
-
- signalValue1Set.await();
- StatefulTestPE pe = (StatefulTestPE) S4TestCase.registeredPEs
- .get(new SafeKeeperId("statefulPE", "value"));
- Assert.assertEquals("message1", pe.getValue1());
- Assert.assertEquals("", pe.getValue2());
-
- // 3. generate a checkpoint event
- gen.injectValueEvent(new KeyValue("initiateCheckpoint", "blah"),
- "Stream1", 0);
- signalCheckpointed.await();
-
- // NOTE: the backend has asynchronous save operations
- Thread.sleep(1000);
-
- SafeKeeperId safeKeeperId = pe.getSafeKeeperId();
- File expected = new File(System.getProperty("user.dir")
- + File.separator
- + "tmp"
- + File.separator
- + "storage"
- + File.separator
- + safeKeeperId.getPrototypeId()
- + File.separator
- + Base64.encodeBase64URLSafeString(safeKeeperId
- .getStringRepresentation().getBytes()));
-
- // 4. verify that state was correctly persisted
- Assert.assertTrue(expected.exists());
-
- StatefulTestPE refPE = new StatefulTestPE();
- refPE.setValue1("message1");
- refPE.setId("statefulPE");
- refPE.setKeys(new String[] {});
- KryoSerDeser kryoSerDeser = new KryoSerDeser();
- byte[] refBytes = kryoSerDeser.serialize(refPE);
-
- Assert.assertTrue(Arrays.equals(refBytes,
- TestUtils.readFileAsByteArray(expected)));
-
- app.destroy();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/EventGenerator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/EventGenerator.java b/s4-core/src/test/java/io/s4/ft/EventGenerator.java
deleted file mode 100644
index 66c5e79..0000000
--- a/s4-core/src/test/java/io/s4/ft/EventGenerator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package io.s4.ft;
-
-import io.s4.collector.EventWrapper;
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-import io.s4.dispatcher.partitioner.KeyInfo;
-import io.s4.emitter.CommLayerEmitter;
-import io.s4.schema.Schema;
-import io.s4.serialize.KryoSerDeser;
-import io.s4.serialize.SerializerDeserializer;
-import io.s4.util.LoadGenerator;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public class EventGenerator {
-
- private CommLayerEmitter eventEmitter;
-
- public EventGenerator() {
- SerializerDeserializer serDeser = new KryoSerDeser();
-
- eventEmitter = new CommLayerEmitter();
- eventEmitter.setAppName("s4");
- eventEmitter.setListenerAppName("s4");
- eventEmitter.setClusterManagerAddress("localhost");
- eventEmitter
- .setSenderId(String.valueOf(System.currentTimeMillis() / 1000));
- eventEmitter.setSerDeser(serDeser);
- eventEmitter.init();
-
- LoadGenerator generator = new LoadGenerator();
- generator.setEventEmitter(eventEmitter);
- }
-
- public void injectValueEvent(KeyValue keyValue, String streamName,
- int partitionId) throws JSONException {
-
- Schema schema = new Schema(KeyValue.class);
- JSONObject jsonRecord = new JSONObject("{key:" + keyValue.getKey()
- + ",value:" + keyValue.getValue() + "}");
- Object event = LoadGenerator.makeRecord(jsonRecord, schema);
- CompoundKeyInfo compoundKeyInfo = new CompoundKeyInfo();
- compoundKeyInfo.setCompoundKey("key");
- compoundKeyInfo.setCompoundValue("value");
- List<CompoundKeyInfo> compoundKeyInfos = new ArrayList<CompoundKeyInfo>();
- compoundKeyInfos.add(compoundKeyInfo);
- EventWrapper eventWrapper = new EventWrapper(streamName, event,
- compoundKeyInfos);
- eventEmitter.emit(partitionId, eventWrapper);
- }
-
- public void injectEvent(Object event, String streamName, int partitionId,
- List<CompoundKeyInfo> compoundKeyInfos) throws JSONException {
-
- EventWrapper eventWrapper = new EventWrapper(streamName, event,
- compoundKeyInfos);
- eventEmitter.emit(partitionId, eventWrapper);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/KeyValue.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/KeyValue.java b/s4-core/src/test/java/io/s4/ft/KeyValue.java
deleted file mode 100644
index c7ff9e3..0000000
--- a/s4-core/src/test/java/io/s4/ft/KeyValue.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package io.s4.ft;
-
-public class KeyValue {
-
- String key;
- String value;
-
- public KeyValue() {
- }
-
- public KeyValue(String key, String value) {
- super();
- this.key = key;
- this.value = value;
- }
-
- public String getKey() {
- return key;
- }
-
- public String getValue() {
- return value;
- }
-
- public void setKey(String key) {
- this.key = key;
- }
-
- public void setValue(String value) {
- this.value = value;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/RecoveryTest.java b/s4-core/src/test/java/io/s4/ft/RecoveryTest.java
deleted file mode 100644
index 177de92..0000000
--- a/s4-core/src/test/java/io/s4/ft/RecoveryTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-package io.s4.ft;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import junit.framework.Assert;
-
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxn.Factory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RecoveryTest extends S4TestCase {
-
- public static long ZOOKEEPER_PORT = 21810;
- private Process forkedS4App = null;
- private static Factory zookeeperServerConnectionFactory = null;
-
- @Before
- public void prepare() throws Exception {
- TestUtils.cleanupTmpDirs();
- zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
- final ZooKeeper zk = TestUtils.createZkClient();
- try {
- zk.delete("/value1Set", -1);
- } catch (Exception ignored) {
- }
- try {
- // FIXME can't figure out where this is retained
- zk.delete("/value2Set", -1);
- } catch (Exception ignored) {
- }
- try {
- // FIXME can't figure out where this is retained
- zk.delete("/checkpointed", -1);
- } catch (Exception ignored) {
- }
- zk.close();
- }
-
- @After
- public void cleanup() throws Exception {
- TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
- TestUtils.killS4App(forkedS4App);
- }
-
- @Test
- public void testCheckpointRestorationThroughApplicationEvent()
- throws Exception {
- final ZooKeeper zk = TestUtils.createZkClient();
- // 1. instantiate remote S4 app
- forkedS4App = TestUtils.forkS4App(getClass().getName(),
- "s4_core_conf_fs_backend.xml");
- // TODO synchro
- Thread.sleep(4000);
-
- CountDownLatch signalValue1Set = new CountDownLatch(1);
- TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
-
- // 2. generate a simple event that changes the state of the PE
- // --> this event triggers recovery
- // we inject a value for value2 field (was for value1 in
- // checkpointing
- // test). This should trigger recovery and provide a pe with value1
- // and
- // value2 set:
- // value1 from recovery, and value2 from injected event.
- EventGenerator gen = new EventGenerator();
- gen.injectValueEvent(new KeyValue("value1", "message1"), "Stream1", 0);
- signalValue1Set.await();
- final CountDownLatch signalCheckpointed = new CountDownLatch(1);
- TestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed,
- zk);
- // trigger checkpoint
- gen.injectValueEvent(new KeyValue("initiateCheckpoint", "blah"),
- "Stream1", 0);
- signalCheckpointed.await();
- // signalCheckpointAddedByBK.await();
-
- signalValue1Set = new CountDownLatch(1);
- TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
- gen.injectValueEvent(new KeyValue("value1", "message1b"), "Stream1", 0);
- signalValue1Set.await();
- Assert.assertEquals("value1=message1b ; value2=",
- TestUtils.readFile(StatefulTestPE.DATA_FILE));
-
- Thread.sleep(2000);
- // kill app
- forkedS4App.destroy();
- // S4App.killS4App(getClass().getName());
-
- StatefulTestPE.DATA_FILE.delete();
-
- forkedS4App = TestUtils.forkS4App(getClass().getName(),
- "s4_core_conf_fs_backend.xml");
- // TODO synchro
- Thread.sleep(2000);
- // trigger recovery by sending application event to set value 2
- CountDownLatch signalValue2Set = new CountDownLatch(1);
- TestUtils.watchAndSignalCreation("/value2Set", signalValue2Set, zk);
-
- gen.injectValueEvent(new KeyValue("value2", "message2"), "Stream1", 0);
- signalValue2Set.await(10, TimeUnit.SECONDS);
-
- // we should get "message1" (checkpointed) instead of "message1b"
- // (latest)
- Assert.assertEquals("value1=message1 ; value2=message2",
- TestUtils.readFile(StatefulTestPE.DATA_FILE));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/S4App.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/S4App.java b/s4-core/src/test/java/io/s4/ft/S4App.java
deleted file mode 100644
index 749a771..0000000
--- a/s4-core/src/test/java/io/s4/ft/S4App.java
+++ /dev/null
@@ -1,201 +0,0 @@
-package io.s4.ft;
-
-import io.s4.processor.AbstractPE;
-import io.s4.processor.PEContainer;
-import io.s4.util.Watcher;
-import io.s4.util.clock.Clock;
-import io.s4.util.clock.EventClock;
-
-import java.io.File;
-import java.io.FileDescriptor;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.StringWriter;
-import java.lang.management.ManagementFactory;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.log4j.Logger;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.FileSystemXmlApplicationContext;
-import org.springframework.core.io.ClassPathResource;
-
-/**
- *
- *
- */
-public class S4App {
-
- String configType = "typical";
- long seedTime = 0;
- ApplicationContext appContext = null;
- ApplicationContext adapterContext = null;
- private String configBase;
- boolean configPathsInitialized = false;
- private String[] coreConfigFileUrls;
- private Class testClass;
- private String s4CoreConfFileName;
- public static File DEFAULT_TEST_OUTPUT_DIR = new File(
- System.getProperty("user.dir") + File.separator + "tmp");
- public static File DEFAULT_STORAGE_DIR = new File(
- DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
- + "storage");
-
- public static String lockDirPath = System.getProperty("user.dir")
- + File.separator + "tmp" + File.separator + "lock";
-
- private S4App() {}
-
- public S4App(Class testClass, String s4CoreConfFileName) throws Exception {
- this.testClass = testClass;
- this.s4CoreConfFileName = s4CoreConfFileName;
- initConfigPaths(testClass, s4CoreConfFileName);
- }
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- Class testClass = Class.forName(args[0]);
- String s4CoreConfFile = args[1];
- S4App app = new S4App(testClass, s4CoreConfFile);
- S4TestCase.initS4Parameters();
- app.initializeS4App();
-
- }
-
- /**
- * Performs dependency injection and starts the S4 plaftform.
- */
- public void initializeS4App()
- throws Exception {
- initConfigPaths(testClass, s4CoreConfFileName);
- ApplicationContext coreContext = null;
-
- coreContext = new FileSystemXmlApplicationContext(coreConfigFileUrls,
- coreContext);
- ApplicationContext context = coreContext;
-
- Clock clock = (Clock) context.getBean("clock");
- if (clock instanceof EventClock && seedTime > 0) {
- EventClock s4EventClock = (EventClock) clock;
- s4EventClock.updateTime(seedTime);
- System.out.println("Intializing event clock time with seed time "
- + s4EventClock.getCurrentTime());
- }
-
- PEContainer peContainer = (PEContainer) context.getBean("peContainer");
-
- Watcher w = (Watcher) context.getBean("watcher");
- w.setConfigFilename(configBase + s4CoreConfFileName);
-
- // load extension modules
- // String[] configFileNames = getModuleConfigFiles(extsHome, prop);
- // if (configFileNames.length > 0) {
- // String[] configFileUrls = new String[configFileNames.length];
- // for (int i = 0; i < configFileNames.length; i++) {
- // configFileUrls[i] = "file:" + configFileNames[i];
- // }
- // context = new FileSystemXmlApplicationContext(configFileUrls,
- // context);
- // }
-
- // load application modules
- String applicationConfigFileName = configBase + "app_conf.xml";
- String[] configFileUrls = new String[] { "file:"
- + applicationConfigFileName };
- context = new FileSystemXmlApplicationContext(configFileUrls, context);
- // attach any beans that implement ProcessingElement to the PE
- // Container
- String[] processingElementBeanNames = context
- .getBeanNamesForType(AbstractPE.class);
- for (String processingElementBeanName : processingElementBeanNames) {
- Object bean = context.getBean(processingElementBeanName);
- try {
- Method getS4ClockMethod = bean.getClass().getMethod(
- "getClock");
-
- if (getS4ClockMethod.getReturnType().equals(Clock.class)) {
- if (getS4ClockMethod.invoke(bean) == null) {
- Method setS4ClockMethod = bean.getClass().getMethod(
- "setClock", Clock.class);
- setS4ClockMethod.invoke(bean,
- coreContext.getBean("clock"));
- }
- }
- ((AbstractPE)bean).setSafeKeeper((SafeKeeper) context.getBean("safeKeeper"));
- } catch (NoSuchMethodException mnfe) {
- // acceptable
- }
- System.out.println("Adding processing element with bean name "
- + processingElementBeanName + ", id "
- + ((AbstractPE) bean).getId());
- peContainer.addProcessor((AbstractPE) bean);
- }
-
- appContext = context;
- }
-
-
-
- private void initConfigPaths(Class testClass, String s4CoreConfFileName)
- throws IOException {
- if (!configPathsInitialized) {
- S4TestCase.initS4Parameters();
- String testDir = testClass.getPackage().getName()
- .replace('.', File.separatorChar);
-
- ClassPathResource propResource = new ClassPathResource(
- "s4_core.properties");
- Properties prop = new Properties();
- if (propResource.exists()) {
- prop.load(propResource.getInputStream());
- } else {
- System.err
- .println("Unable to find s4_core.properties. It must be available in classpath");
- Thread.dumpStack();
- System.exit(12);
- }
-
- configBase = System.getProperty("user.dir") + File.separator
- + "src" + File.separator + "test" + File.separator + "java"
- + File.separator + testDir + File.separator;
- String configPath = configBase + File.separatorChar
- + "wall_clock.xml";
- List<String> coreConfigUrls = new ArrayList<String>();
- coreConfigUrls.add(configPath);
-
- // load core config xml
- if (s4CoreConfFileName != null) {
- // may be null for adapter
- configPath = configBase + s4CoreConfFileName;
- File configFile = new File(configPath);
- if (!configFile.exists()) {
- System.err.printf(
- "S4 core config file %s does not exist\n",
- configPath);
- Thread.dumpStack();
- System.exit(13);
- }
- coreConfigUrls.add(configPath);
- }
- String[] coreConfigFiles = new String[coreConfigUrls.size()];
- coreConfigUrls.toArray(coreConfigFiles);
-
- coreConfigFileUrls = new String[coreConfigFiles.length];
- for (int i = 0; i < coreConfigFiles.length; i++) {
- coreConfigFileUrls[i] = "file:" + coreConfigFiles[i];
- }
- configPathsInitialized = true;
-
- }
- }
-
- public void destroy() {
- ((FileSystemXmlApplicationContext)appContext).close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/S4TestCase.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/S4TestCase.java b/s4-core/src/test/java/io/s4/ft/S4TestCase.java
deleted file mode 100644
index 9d7b5ab..0000000
--- a/s4-core/src/test/java/io/s4/ft/S4TestCase.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package io.s4.ft;
-
-import io.s4.processor.AbstractPE;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Hashtable;
-import java.util.Map;
-
-import org.junit.BeforeClass;
-import org.springframework.context.ApplicationContext;
-
-public class S4TestCase {
-
- String configType = "typical";
- long seedTime = 0;
- ApplicationContext appContext = null;
- ApplicationContext adapterContext = null;
- boolean configPathsInitialized = false;
- public static File DEFAULT_TEST_OUTPUT_DIR = new File(
- System.getProperty("user.dir") + File.separator + "tmp");
- public static File DEFAULT_STORAGE_DIR = new File(
- DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
- + "storage");
- // use a static map to track PE instances
- public static final Map<Object, AbstractPE> registeredPEs = new Hashtable<Object, AbstractPE>();
-
-
- @BeforeClass
- public static void cleanLocks() {
- TestUtils.cleanupTmpDirs();
- }
-
-
- @BeforeClass
- public static void initS4Parameters() throws IOException {
-
- System.setProperty("commlayer_mode", "static");
- System.setProperty("commlayer.mode", "static");
- System.setProperty("DequeueCount", "6");
- System.setProperty("lock_dir", S4App.lockDirPath);
- File lockDir = new File(S4App.lockDirPath);
- if (!lockDir.exists()) {
- if (!lockDir.mkdirs()) {
- throw new RuntimeException("Cannot create directory: ["+lockDir.getAbsolutePath()+"]");
- }
- } else {
- TestUtils.deleteDirectoryContents(lockDir);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/SimpleEventProducer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/SimpleEventProducer.java b/s4-core/src/test/java/io/s4/ft/SimpleEventProducer.java
deleted file mode 100644
index 002d181..0000000
--- a/s4-core/src/test/java/io/s4/ft/SimpleEventProducer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package io.s4.ft;
-
-import io.s4.collector.EventWrapper;
-import io.s4.listener.EventHandler;
-import io.s4.listener.EventProducer;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class SimpleEventProducer implements EventProducer {
-
- private Set<io.s4.listener.EventHandler> handlers = new HashSet<io.s4.listener.EventHandler>();
- private String streamName;
-
- LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>();
-
- public void init() {
- }
-
- @Override
- public void addHandler(EventHandler handler) {
- handlers.add(handler);
-
- }
-
- @Override
- public boolean removeHandler(EventHandler handler) {
- return handlers.remove(handler);
- }
-
- public void setStreamName(String streamName) {
- this.streamName = streamName;
- }
-
- public String getStreamName() {
- return streamName;
- }
-
- // TODO JSON-like stuff
- public void produceEvent(String message) {
- EventWrapper ew = new EventWrapper(streamName, message, null);
- for (io.s4.listener.EventHandler handler : handlers) {
- try {
- handler.processEvent(ew);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/StatefulTestPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/StatefulTestPE.java b/s4-core/src/test/java/io/s4/ft/StatefulTestPE.java
deleted file mode 100644
index b4af477..0000000
--- a/s4-core/src/test/java/io/s4/ft/StatefulTestPE.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package io.s4.ft;
-
-import io.s4.processor.AbstractPE;
-
-import java.io.File;
-import java.io.FileDescriptor;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-
-public class StatefulTestPE extends AbstractPE implements Watcher {
-
- String id;
- String value1 = "";
- String value2 = "";
- transient ZooKeeper zk = null;
- transient public static File DATA_FILE = new File(
- System.getProperty("user.dir")
- + File.separator + "tmp" + File.separator + "StatefulTestPE.data");;
-
- @Override
- public String getId() {
- return id;
- }
-
- @Override
- public void output() {
- // TODO Auto-generated method stub
-
- }
-
- public void processEvent(KeyValue event) {
- if (zk == null) {
- try {
- zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- if (!S4TestCase.registeredPEs.containsKey(getSafeKeeperId())) {
- S4TestCase.registeredPEs.put(getSafeKeeperId(), this);
- }
- try {
-
- if ("value1".equals(event.getKey())) {
- setValue1(event.getValue());
- zk.create("/value1Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } else if ("value2".equals(event.getKey())) {
- setValue2(event.getValue());
- zk.create("/value2Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } else if ("initiateCheckpoint".equals(event.getKey())) {
- initiateCheckpoint();
- } else {
- throw new RuntimeException("unidentified event: " + event);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
-
- public String getValue1() {
- return value1;
- }
-
- public void setValue1(String value1) {
- this.value1 = value1;
- persistValues();
- }
-
- public String getValue2() {
- return value2;
- }
-
- public void setValue2(String value2) {
- this.value2 = value2;
- persistValues();
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- protected void checkpoint() {
- super.checkpoint();
- try {
- zk.create("/checkpointed", new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
-
- // NOTE: we use a file as a simple way to keep track of changes
- private void persistValues() {
-
- if (DATA_FILE.exists()) {
- if (!DATA_FILE.delete()) {
- throw new RuntimeException("Cannot delete datafile "
- + DATA_FILE.getAbsolutePath());
- }
- }
- try {
- if (!DATA_FILE.createNewFile()) {
- throw new RuntimeException("Cannot create datafile "
- + DATA_FILE.getAbsolutePath());
- }
- } catch (IOException e) {
- throw new RuntimeException("Cannot create datafile "
- + DATA_FILE.getAbsolutePath());
- }
- try {
- TestUtils.writeStringToFile("value1=" + value1 + " ; value2=" + value2,
- DATA_FILE);
- } catch (IOException e) {
- throw new RuntimeException("Cannot write to datafile "
- + DATA_FILE.getAbsolutePath());
- }
- }
-
- @Override
- public void process(WatchedEvent event) {
- // TODO Auto-generated method stub
-
- }
-
-}