You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:15 UTC

[24/50] [abbrv] Rename packages in preparation for move to Apache

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/util/JSONUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/JSONUtil.java b/s4-comm/src/main/java/org/apache/s4/comm/util/JSONUtil.java
new file mode 100644
index 0000000..cf1d56c
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/util/JSONUtil.java
@@ -0,0 +1,285 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.util;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
+import java.util.Iterator;
+import java.util.ArrayList;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class JSONUtil {
+    static Set<Class> knownTypes = new HashSet<Class>();
+    static {
+        knownTypes.add(String.class);
+        knownTypes.add(Double.class);
+        knownTypes.add(Integer.class);
+        knownTypes.add(Float.class);
+        knownTypes.add(Long.class);
+        knownTypes.add(Boolean.class);
+    }
+
+    public static String toJsonString(Object obj) {
+        Map<String, Object> map = getMap(obj);
+        JSONObject jsonObject = toJSONObject(map);
+        return jsonObject.toString();
+    }
+
+    public static Map<String, Object> getMapFromJson(String str) {
+        return getRawRecord(fromJsonString(str));
+    }
+
+    public static Map<String, Object> getRawRecord(JSONObject jsonRecord) {
+        Map<String, Object> record = new HashMap<String, Object>();
+        for (Iterator it = jsonRecord.keys(); it.hasNext();) {
+            try {
+                String key = (String) it.next();
+                Object value = jsonRecord.get(key);
+                record.put(key, fixValue(value));
+            } catch (Exception e) {
+                continue;
+            }
+        }
+        return record;
+    }
+
+    public static List<Map<String, Object>> getRawList(JSONArray jsonList) {
+        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
+        int length = jsonList.length();
+        for (int i = 0; i < length; i++) {
+            try {
+                Object value = jsonList.get(i);
+                value = fixValue(value);
+                if (!(value instanceof Map)) {
+                    Map<String, Object> mapValue = new HashMap<String, Object>();
+                    mapValue.put("value", value);
+                    value = mapValue;
+                }
+                list.add((Map<String, Object>) value);
+            } catch (Exception e) {
+                continue;
+            }
+        }
+        return list;
+    }
+
+    public static Object fixValue(Object originalValue) {
+        Object value = null;
+        if (originalValue instanceof Float) {
+            value = new Double((Float) originalValue);
+        } else if (originalValue instanceof Integer) {
+            value = new Long((Integer) originalValue);
+        } else if (originalValue instanceof JSONArray) {
+            value = getRawList((JSONArray) originalValue);
+        } else if (originalValue instanceof JSONObject) {
+            value = getRawRecord((JSONObject) originalValue);
+        } else {
+            value = originalValue;
+        }
+        return value;
+    }
+
+    public static JSONObject fromJsonString(String str) {
+        JSONObject object;
+        try {
+            object = new JSONObject(str);
+        } catch (JSONException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+        return object;
+    }
+
+    public static JSONObject toJSONObject(Map<String, Object> map) {
+        JSONObject jsonObject = new JSONObject();
+        try {
+            for (String key : map.keySet()) {
+                Object val = map.get(key);
+                if (val instanceof Map) {
+                    jsonObject.put(key, toJSONObject((Map<String, Object>) val));
+                } else if (val instanceof List) {
+                    jsonObject.put(key, toJSONList((List) val));
+                } else {
+                    jsonObject.put(key, val);
+                }
+            }
+        } catch (JSONException je) {
+            je.printStackTrace();
+            return null;
+        }
+        return jsonObject;
+    }
+
+    private static JSONArray toJSONList(List list) {
+        JSONArray arr = new JSONArray();
+        for (Object val : list) {
+            if (val instanceof Map) {
+                arr.put(toJSONObject((Map<String, Object>) val));
+            } else if (val instanceof List) {
+                arr.put(toJSONList((List) val));
+            } else {
+                arr.put(val);
+            }
+        }
+        return arr;
+
+    }
+
+    public static Map<String, Object> getMap(Object obj) {
+        Map<String, Object> map = new HashMap<String, Object>();
+        if (obj != null) {
+            if (Map.class.isAssignableFrom(obj.getClass())) {
+                return (Map) obj;
+            } else {
+
+                Field[] fields = obj.getClass().getDeclaredFields();
+                for (int i = 0; i < fields.length; i++) {
+                    if (!fields[i].isAccessible()) {
+                        fields[i].setAccessible(true);
+                    }
+                    try {
+                        String name = fields[i].getName();
+                        Object val = fields[i].get(obj);
+                        if (!Modifier.isStatic(fields[i].getModifiers())
+                                && !Modifier.isTransient(fields[i].getModifiers())) {
+                            if (fields[i].getType().isPrimitive()
+                                    || knownTypes.contains(fields[i].getType())) {
+                                map.put(name, val);
+                            } else if (fields[i].getType().isArray()) {
+                                int length = Array.getLength(val);
+                                Object vals[] = new Object[length];
+                                for (int j = 0; j < length; j++) {
+                                    Object arrVal = Array.get(val, j);
+                                    if (arrVal.getClass().isPrimitive()
+                                            || knownTypes.contains(arrVal.getClass())) {
+                                        vals[j] = arrVal;
+                                    } else {
+                                        vals[j] = getMap(arrVal);
+                                    }
+                                }
+                                map.put(name, vals);
+                            } else {
+                                map.put(name, getMap(val));
+                            }
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException("Exception while getting value of "
+                                                           + fields[i],
+                                                   e);
+                    }
+                }
+            }
+        }
+        return map;
+    }
+
+    public static void main(String[] args) {
+        Map<String, Object> outerMap = new HashMap<String, Object>();
+
+        outerMap.put("doubleValue", 0.3456d);
+        outerMap.put("integerValue", 175647);
+        outerMap.put("longValue", 0x0000005000067000l);
+        outerMap.put("stringValue", "Hello there");
+
+        Map<String, Object> innerMap = null;
+        List<Map<String, Object>> innerList1 = new ArrayList<Map<String, Object>>();
+
+        innerMap = new HashMap<String, Object>();
+        innerMap.put("name", "kishore");
+        innerMap.put("count", 1787265);
+        innerList1.add(innerMap);
+        innerMap = new HashMap<String, Object>();
+        innerMap.put("name", "fred");
+        innerMap.put("count", 11);
+        innerList1.add(innerMap);
+
+        outerMap.put("innerList1", innerList1);
+
+        List<Integer> innerList2 = new ArrayList<Integer>();
+        innerList2.add(65);
+        innerList2.add(2387894);
+        innerList2.add(456);
+
+        outerMap.put("innerList2", innerList2);
+
+        JSONObject jsonObject = toJSONObject(outerMap);
+
+        String flatJSONString = null;
+        try {
+            System.out.println(jsonObject.toString(3));
+            flatJSONString = jsonObject.toString();
+            Object o = jsonObject.get("innerList1");
+            if (!(o instanceof JSONArray)) {
+                System.out.println("Unexpected type of list "
+                        + o.getClass().getName());
+            } else {
+                JSONArray jsonArray = (JSONArray) o;
+                o = jsonArray.get(0);
+                if (!(o instanceof JSONObject)) {
+                    System.out.println("Unexpected type of map "
+                            + o.getClass().getName());
+                } else {
+                    JSONObject innerJSONObject = (JSONObject) o;
+                    System.out.println(innerJSONObject.get("name"));
+                }
+            }
+        } catch (JSONException je) {
+            je.printStackTrace();
+        }
+
+        if (!flatJSONString.equals(toJsonString(outerMap))) {
+            System.out.println("JSON strings don't match!!");
+        }
+
+        Map<String, Object> map = getMapFromJson(flatJSONString);
+
+        Object o = map.get("doubleValue");
+        if (!(o instanceof Double)) {
+            System.out.println("Expected type Double, got "
+                    + o.getClass().getName());
+            Double doubleValue = (Double) o;
+            if (doubleValue != 0.3456d) {
+                System.out.println("Expected 0.3456, got " + doubleValue);
+            }
+        }
+
+        o = map.get("innerList1");
+        if (!(o instanceof List)) {
+            System.out.println("Expected implementation of List, got "
+                    + o.getClass().getName());
+        } else {
+            List innerList = (List) o;
+            o = innerList.get(0);
+            if (!(o instanceof Map)) {
+                System.out.println("Expected implementation of Map, got "
+                        + o.getClass().getName());
+            } else {
+                innerMap = (Map) o;
+                System.out.println(innerMap.get("name"));
+            }
+        }
+        System.out.println(map);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java b/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
new file mode 100644
index 0000000..0e968d9
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.util;
+
+public class SystemUtils {
+
+    private SystemUtils() {
+    }
+
+    public static long getPID() {
+        String processName = java.lang.management.ManagementFactory.getRuntimeMXBean()
+                                                                   .getName();
+        return Long.parseLong(processName.split("@")[0]);
+    }
+
+    public static void main(String[] args) {
+        String msg = "My PID is " + SystemUtils.getPID();
+
+        javax.swing.JOptionPane.showConfirmDialog((java.awt.Component) null,
+                                                  msg,
+                                                  "SystemUtils",
+                                                  javax.swing.JOptionPane.DEFAULT_OPTION);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
new file mode 100644
index 0000000..f5293b8
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.zk;
+
+public class ThreadTest {
+    static Object lock = new Object();
+
+    public static void main(String[] args) {
+
+        Thread t1 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (true) {
+                    synchronized (lock) {
+                        System.out.println("In thread T1");
+                        try {
+                            System.out.println("Going to wait");
+                            long start = System.currentTimeMillis();
+                            lock.wait();
+                            long end = System.currentTimeMillis();
+                            System.out.println("Woke up T1 after :"
+                                    + (end - start) / 1000 + "secs");
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+            }
+        });
+        t1.start();
+        Thread t2 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (true) {
+                    synchronized (lock) {
+                        try {
+                            Thread.sleep(10000);
+                        } catch (InterruptedException e) {
+                            // TODO Auto-generated catch block
+                            e.printStackTrace();
+                        }
+                        System.out.println("In thread T2");
+                        lock.notify();
+                        break;
+                    }
+                }
+            }
+        });
+        t2.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
new file mode 100644
index 0000000..0fe7fa4
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.CommEventCallback;
+import org.apache.s4.comm.core.DefaultWatcher;
+import org.apache.s4.comm.core.ProcessMonitor;
+import org.apache.s4.comm.util.JSONUtil;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkProcessMonitor extends DefaultWatcher implements Runnable,
+        ProcessMonitor {
+    static Logger logger = Logger.getLogger(ZkProcessMonitor.class);
+    private List<Object> destinationList;
+    private Map<Integer, Object> destinationMap;
+    private String processZNode;
+    private Object lock = new Object();
+    private volatile boolean updateMode = false;
+    private String taskZNode;
+    private int taskCount;
+
+    public ZkProcessMonitor(String address, String clusterName, ClusterType clusterType) {
+        this(address, clusterName, clusterType, null);
+    }
+
+    public ZkProcessMonitor(String address, String ClusterName, ClusterType clusterType,
+            CommEventCallback callbackHandler) {
+        super(address, callbackHandler);
+        String root = "/" + ClusterName + "/" + clusterType.toString();
+        this.taskZNode = root + "/task";
+        this.processZNode = root + "/process";
+        destinationMap = new HashMap<Integer, Object>();
+        destinationList = new ArrayList<Object>();
+    }
+
+    public void monitor() {
+        synchronized (mutex) {
+            readConfig();
+        }
+        new Thread(this).start();
+    }
+
+    private void readConfig() {
+        try {
+            synchronized (lock) {
+                Map<Integer, Object> tempDestinationMap = new HashMap<Integer, Object>();
+                List<Object> tempDestinationList = new ArrayList<Object>();
+                updateMode = true;
+                List<String> tasks = zk.getChildren(taskZNode, false);
+                this.taskCount = tasks.size();
+                List<String> children = zk.getChildren(processZNode, false);
+                for (String name : children) {
+                    Stat stat = zk.exists(processZNode + "/" + name, false);
+                    if (stat != null) {
+                        byte[] data = zk.getData(processZNode + "/" + name,
+                                                 false,
+                                                 stat);
+                        Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(data));
+                        String key = (String) map.get("partition");
+                        if (key != null) {
+                            tempDestinationMap.put(Integer.parseInt(key), map);
+                        }
+                        tempDestinationList.add(map);
+                    }
+                }
+                destinationList.clear();
+                destinationMap.clear();
+                destinationList.addAll(tempDestinationList);
+                destinationMap.putAll(tempDestinationMap);
+                logger.info("Updated Destination List to" + destinationList);
+                logger.info("Updated Destination Map to" + destinationMap);
+            }
+        } catch (KeeperException e) {
+            logger.warn("Ignorable exception if it happens once in a while", e);
+        } catch (InterruptedException e) {
+            logger.error("Interrupted exception cause while reading process znode",
+                         e);
+        } finally {
+            updateMode = false;
+        }
+    }
+
+    public void run() {
+        try {
+            while (true) {
+                synchronized (mutex) {
+                    // set watch
+                    logger.info("Setting watch on " + processZNode);
+                    zk.getChildren(processZNode, true);
+                    readConfig();
+                    mutex.wait();
+                }
+            }
+        } catch (KeeperException e) {
+            logger.warn("KeeperException in ProcessMonitor.run", e);
+        } catch (InterruptedException e) {
+            logger.warn("InterruptedException in ProcessMonitor.run", e);
+        }
+    }
+
+    public List<Object> getDestinationList() {
+        if (updateMode) {
+            synchronized (lock) {
+                return destinationList;
+            }
+        } else {
+            return destinationList;
+        }
+    }
+
+    public Map<Integer, Object> getDestinationMap() {
+        if (updateMode) {
+            synchronized (lock) {
+                return destinationMap;
+            }
+        } else {
+            return destinationMap;
+        }
+    }
+
+    @Override
+    public int getTaskCount() {
+        return taskCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkQueue.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkQueue.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkQueue.java
new file mode 100644
index 0000000..8bac50b
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkQueue.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.DefaultWatcher;
+import org.apache.s4.comm.util.IOUtil;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkQueue extends DefaultWatcher {
+    /**
+     * Constructor of producer-consumer queue
+     * 
+     * @param address
+     * @param name
+     */
+    public ZkQueue(String address, String name) {
+        super(address);
+        this.root = name;
+        // Create ZK node name
+        if (zk != null) {
+            try {
+                Stat s = zk.exists(root, false);
+                if (s == null) {
+                    zk.create(root,
+                              new byte[0],
+                              Ids.OPEN_ACL_UNSAFE,
+                              CreateMode.PERSISTENT);
+                }
+            } catch (KeeperException e) {
+                System.out.println("Keeper exception when instantiating queue: "
+                        + e.toString());
+            } catch (InterruptedException e) {
+                System.out.println("Interrupted exception");
+            }
+        }
+    }
+
+    /**
+     * Add element to the queue.
+     * 
+     * @param obj element to add
+     * @return true if add successful, false otherwise
+     */
+
+    public boolean produce(Object obj) throws KeeperException,
+            InterruptedException {
+        byte[] value = IOUtil.serializeToBytes(obj);
+        zk.create(root + "/element",
+                  value,
+                  Ids.OPEN_ACL_UNSAFE,
+                  CreateMode.PERSISTENT_SEQUENTIAL);
+        return true;
+    }
+
+    /**
+     * Remove first element from the queue.
+     * 
+     * @return first element from the queue
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public Object consume() throws KeeperException, InterruptedException {
+        Object retvalue = -1;
+        Stat stat = null;
+
+        // Get the first element available
+        while (true) {
+            synchronized (mutex) {
+                List<String> list = zk.getChildren(root, true);
+                if (list.size() == 0) {
+                    System.out.println("Going to wait");
+                    mutex.wait();
+                } else {
+                    Integer min = new Integer(list.get(0).substring(7));
+                    String name = list.get(0);
+                    for (String s : list) {
+                        Integer tempValue = new Integer(s.substring(7));
+                        // System.out.println("Temporary value: " + s);
+                        if (tempValue < min) {
+                            min = tempValue;
+                            name = s;
+                        }
+                    }
+                    String zNode = root + "/" + name;
+                    System.out.println("Temporary value: " + zNode);
+                    byte[] b = zk.getData(zNode, false, stat);
+                    zk.delete(zNode, 0);
+                    retvalue = IOUtil.deserializeToObject(b);
+                    return retvalue;
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskManager.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskManager.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskManager.java
new file mode 100644
index 0000000..053571d
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskManager.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.CommEventCallback;
+import org.apache.s4.comm.core.DefaultWatcher;
+import org.apache.s4.comm.core.TaskManager;
+import org.apache.s4.comm.util.JSONUtil;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkTaskManager extends DefaultWatcher implements TaskManager {
+    static Logger logger = Logger.getLogger(ZkTaskManager.class);
+    String tasksListRoot;
+    String processListRoot;
+
+    public ZkTaskManager(String address, String ClusterName, ClusterType clusterType) {
+        this(address, ClusterName, clusterType, null);
+    }
+
+    /**
+     * Constructor of TaskManager
+     * 
+     * @param address
+     * @param ClusterName
+     */
+    public ZkTaskManager(String address, String ClusterName, ClusterType clusterType,
+            CommEventCallback callbackHandler) {
+        super(address, callbackHandler);
+        this.root = "/" + ClusterName + "/" + clusterType.toString();
+        this.tasksListRoot = root + "/task";
+        this.processListRoot = root + "/process";
+    }
+
+    /**
+     * This will block the process thread from starting the task, when it is
+     * unblocked it will return the data stored in the task node. This data can
+     * be used by the This call assumes that the tasks are already set up
+     * 
+     * @return Object containing data related to the task
+     */
+    @Override
+    public Object acquireTask(Map<String, String> customTaskData) {
+        while (true) {
+            synchronized (mutex) {
+                try {
+                    Stat tExists = zk.exists(tasksListRoot, false);
+                    if (tExists == null) {
+                        logger.error("Tasks znode:" + tasksListRoot
+                                + " not setup.Going to wait");
+                        tExists = zk.exists(tasksListRoot, true);
+                        if (tExists == null) {
+                            mutex.wait();
+                        }
+                        continue;
+                    }
+                    Stat pExists = zk.exists(processListRoot, false);
+                    if (pExists == null) {
+                        logger.error("Process root znode:" + processListRoot
+                                + " not setup.Going to wait");
+                        pExists = zk.exists(processListRoot, true);
+                        if (pExists == null) {
+                            mutex.wait();
+                        }
+                        continue;
+                    }
+                    // setting watch true to tasks node will trigger call back
+                    // if there is any change to task node,
+                    // this is useful to add additional tasks
+                    List<String> tasks = zk.getChildren(tasksListRoot, true);
+                    List<String> processes = zk.getChildren(processListRoot,
+                                                            true);
+                    if (processes.size() < tasks.size()) {
+                        ArrayList<String> tasksAvailable = new ArrayList<String>();
+                        for (int i = 0; i < tasks.size(); i++) {
+                            tasksAvailable.add("" + i);
+                        }
+                        if (processes != null) {
+                            for (String s : processes) {
+                                String taskId = s.split("-")[1];
+                                tasksAvailable.remove(taskId);
+                            }
+                        }
+                        // try pick up a random task
+                        Random random = new Random();
+                        int id = Integer.parseInt(tasksAvailable.get(random.nextInt(tasksAvailable.size())));
+                        String pNode = processListRoot + "/" + "task-" + id;
+                        String tNode = tasksListRoot + "/" + "task-" + id;
+                        Stat pNodeStat = zk.exists(pNode, false);
+                        if (pNodeStat == null) {
+                            Stat tNodeStat = zk.exists(tNode, false);
+                            byte[] bytes = zk.getData(tNode, false, tNodeStat);
+                            Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(bytes));
+                            // if(!map.containsKey("address")){
+                            // map.put("address",
+                            // InetAddress.getLocalHost().getHostName());
+                            // }
+                            if (customTaskData != null) {
+                                for (String key : customTaskData.keySet()) {
+                                    if (!map.containsKey(key)) {
+                                        map.put(key, customTaskData.get(key));
+                                    }
+                                }
+
+                            }
+                            map.put("taskSize", "" + tasks.size());
+                            map.put("tasksRootNode", tasksListRoot);
+                            map.put("processRootNode", processListRoot);
+                            String create = zk.create(pNode,
+                                                      JSONUtil.toJsonString(map)
+                                                              .getBytes(),
+                                                      Ids.OPEN_ACL_UNSAFE,
+                                                      CreateMode.EPHEMERAL);
+                            logger.info("Created process Node:" + pNode + " :"
+                                    + create);
+                            return map;
+                        }
+                    } else {
+                        // all the tasks are taken up, will wait for the
+                        logger.info("No task available to take up. Going to wait");
+                        mutex.wait();
+                    }
+                } catch (KeeperException e) {
+                    logger.info("Warn:mostly ignorable " + e.getMessage(), e);
+                } catch (InterruptedException e) {
+                    logger.info("Warn:mostly ignorable " + e.getMessage(), e);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
new file mode 100644
index 0000000..0b58018
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java
@@ -0,0 +1,282 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.CommEventCallback;
+import org.apache.s4.comm.core.DefaultWatcher;
+import org.apache.s4.comm.util.CommUtil;
+import org.apache.s4.comm.util.JSONUtil;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkTaskSetup extends DefaultWatcher {
+    static Logger logger = Logger.getLogger(ZkTaskSetup.class);
+    String tasksListRoot;
+    String processListRoot;
+
+    public ZkTaskSetup(String address, String clusterName, ClusterType clusterType) {
+        this(address, clusterName, clusterType, null);
+    }
+
+    /**
+     * Constructor of ZkTaskSetup
+     * 
+     * @param address
+     * @param clusterName
+     */
+    public ZkTaskSetup(String address, String clusterName, ClusterType clusterType,
+            CommEventCallback callbackHandler) {
+        super(address, callbackHandler);
+        
+        this.root = "/" + clusterName + "/" + clusterType.toString();
+        this.tasksListRoot = root + "/task";
+        this.processListRoot = root + "/process";
+    }
+
+    public void setUpTasks(Object[] data) {
+        setUpTasks("-1", data);
+    }
+
+    /**
+     * Creates task nodes.
+     * 
+     * @param version
+     * @param data
+     */
+    public void setUpTasks(String version, Object[] data) {
+        try {
+            logger.info("Trying to set up configuration with new version:"
+                    + version);
+            if (!version.equals("-1")) {
+                if (!isConfigVersionNewer(version)) {
+                    logger.info("Config version not newer than current version");
+                    return;
+                } else {
+                    cleanUp();
+                }
+            } else {
+                logger.info("Not checking version number since it is set to -1");
+            }
+
+            // check if config data newer
+            if (!isConfigDataNewer(data)) {
+                logger.info("Config data not newer than current version");
+                return;
+            } else {
+                logger.info("Found newer Config data. Cleaning old data");
+                cleanUp();
+            }
+
+            // Create ZK node name
+            if (zk != null) {
+                Stat s;
+                s = zk.exists(root, false);
+                if (s == null) {
+                    String parent = new File(root).getParent()
+                                                  .replace(File.separatorChar,
+                                                           '/');
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("parent:" + parent);
+                    }
+                    Stat exists = zk.exists(parent, false);
+                    if (exists == null) {
+                        zk.create(parent,
+                                  new byte[0],
+                                  Ids.OPEN_ACL_UNSAFE,
+                                  CreateMode.PERSISTENT);
+                    }
+                    zk.create(root,
+                              new byte[0],
+                              Ids.OPEN_ACL_UNSAFE,
+                              CreateMode.PERSISTENT);
+                }
+            }
+            Stat s;
+            s = zk.exists(tasksListRoot, false);
+            if (s == null) {
+                Map<String, String> map = new HashMap<String, String>();
+                map.put("config.version", version);
+                String jsonString = JSONUtil.toJsonString(map);
+                zk.create(tasksListRoot,
+                          jsonString.getBytes(),
+                          Ids.OPEN_ACL_UNSAFE,
+                          CreateMode.PERSISTENT);
+            }
+            s = zk.exists(processListRoot, false);
+            if (s == null) {
+                zk.create(processListRoot,
+                          new byte[0],
+                          Ids.OPEN_ACL_UNSAFE,
+                          CreateMode.PERSISTENT);
+
+            }
+
+            for (int i = 0; i < data.length; i++) {
+                String nodeName = tasksListRoot + "/" + "task" + "-" + i;
+                Stat sTask = zk.exists(nodeName, false);
+                if (sTask == null) {
+                    logger.info("Creating taskNode: " + nodeName);
+                    byte[] byteBuffer = JSONUtil.toJsonString(data[i])
+                                                .getBytes();
+                    zk.create(nodeName,
+                              byteBuffer,
+                              Ids.OPEN_ACL_UNSAFE,
+                              CreateMode.PERSISTENT);
+                } else {
+                    logger.warn("TaskNode already exisits: " + nodeName);
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Keeper exception when creating task nodes: "
+                                 + e.toString(),
+                         e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private boolean isConfigDataNewer(Object[] data) {
+        try {
+            Stat s;
+            s = zk.exists(tasksListRoot, false);
+            if (s != null) {
+                List<String> children = zk.getChildren(tasksListRoot, false);
+                if (children.size() != data.length) {
+                    return true;
+                }
+                boolean[] matched = new boolean[data.length];
+                for (String child : children) {
+                    String childPath = tasksListRoot + "/" + child;
+                    Stat sTemp = zk.exists(childPath, false);
+                    byte[] tempData = zk.getData(tasksListRoot + "/" + child,
+                                                 false,
+                                                 sTemp);
+                    Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
+
+                    // check if it matches any of the data
+                    for (int i = 0; i < data.length; i++) {
+                        Map<String, Object> newData = (Map<String, Object>) data[i];
+                        if (!matched[i] && CommUtil.compareMaps(newData, map)) {
+                            matched[i] = true;
+                            break;
+                        }
+                    }
+                }
+                for (int i = 0; i < matched.length; i++) {
+                    if (!matched[i]) {
+                        return true;
+                    }
+                }
+            } else {
+                return true;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(" Exception in isConfigDataNewer method ",
+                                       e);
+        }
+        return false;
+    }
+
+    private boolean isConfigVersionNewer(String version) throws Exception {
+        Stat s;
+        s = zk.exists(tasksListRoot, false);
+        if (s != null) {
+            byte[] data = zk.getData(tasksListRoot, false, s);
+            if (data != null && data.length > 0) {
+                String jsonString = new String(data);
+                if (jsonString != null) {
+                    Map<String, Object> map = JSONUtil.getMapFromJson(jsonString);
+                    if (map.containsKey("config.version")) {
+                        boolean update = false;
+                        String currentVersion = map.get("config.version")
+                                                   .toString();
+                        logger.info("Current config version:" + currentVersion);
+                        String[] curV = currentVersion.split("\\.");
+                        String[] newV = version.split("\\.");
+                        for (int i = 0; i < Math.max(curV.length, newV.length); i++) {
+                            if (Integer.parseInt(newV[i]) > Integer.parseInt(curV[i])) {
+                                update = true;
+                                break;
+                            }
+                        }
+                        if (!update) {
+                            logger.info("Current config version is newer. Config will not be updated");
+                        }
+                        return update;
+                    }
+                } else {
+                    logger.info("No data at znode " + tasksListRoot
+                            + " so version checking will not be done");
+                }
+            } else {
+                logger.info("No data at znode " + tasksListRoot
+                        + " so version checking will not be done");
+            }
+        } else {
+            logger.info("znode " + tasksListRoot
+                    + " does not exist, so creating new one is fine");
+        }
+        return true;
+    }
+
+    /**
+     * Will clean up taskList Node and process List Node
+     */
+    public boolean cleanUp() {
+        try {
+            logger.info("Cleaning :" + tasksListRoot);
+            Stat exists = zk.exists(tasksListRoot, false);
+            if (exists != null) {
+                List<String> children = zk.getChildren(tasksListRoot, false);
+                if (children.size() > 0) {
+                    for (String child : children) {
+                        logger.info("Cleaning :" + tasksListRoot + "/" + child);
+                        zk.delete(tasksListRoot + "/" + child, 0);
+                    }
+                }
+                zk.delete(tasksListRoot, 0);
+            }
+
+            exists = zk.exists(processListRoot, false);
+            if (exists != null) {
+                List<String> children = zk.getChildren(processListRoot, false);
+                if (children.size() > 0) {
+                    logger.warn("Some processes are already running. Cleaning them up. Might result in unpredictable behavior");
+                    for (String child : children) {
+                        logger.info("Cleaning :" + processListRoot + "/"
+                                + child);
+                        zk.delete(processListRoot + "/" + child, 0);
+                    }
+                }
+                logger.info("Finished cleaning :" + processListRoot);
+                zk.delete(processListRoot, 0);
+            }
+            return true;
+        } catch (Exception e) {
+            logger.error("Exception while cleaning up: " + e.getMessage(), e);
+            return false;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
new file mode 100644
index 0000000..c743db8
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.zk;
+
+import org.apache.s4.comm.core.DefaultWatcher;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+
+public class ZkUtil extends DefaultWatcher {
+
+    public ZkUtil(String address) {
+        super(address);
+
+    }
+
+    public int getChildCount(String path) {
+        try {
+            List<String> children = zk.getChildren(path, false);
+            return children.size();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public List<String> getChildren(String path) {
+        try {
+            List<String> children = zk.getChildren(path, false);
+            return children;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public byte[] getData(String path) {
+        try {
+            byte[] data = zk.getData(path, false, null);
+            return data;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public void create(String path) {
+        create(path, "");
+    }
+
+    public void create(String path, String data) {
+        try {
+            zk.create(path,
+                      data.getBytes(),
+                      Ids.OPEN_ACL_UNSAFE,
+                      CreateMode.PERSISTENT);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public void deleteRecursive(String path) {
+        List<String> children = getChildren(path);
+        for (String child : children) {
+            deleteRecursive(path + "/" + child);
+        }
+        delete(path);
+    }
+
+    public void delete(String path) {
+        try {
+            zk.delete(path, -1);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        if (args.length == 0) {
+            printUsage();
+
+        }
+        String address = args[0];
+        String methodName = args[1];
+
+        String[] methodArgs = new String[args.length - 2];
+        for (int i = 2; i < args.length; i++) {
+            methodArgs[i - 2] = args[i];
+        }
+        Method[] methods = ZkUtil.class.getMethods();
+        Method method = null;
+        for (Method met : methods) {
+            if (met.getName().equals(methodName)
+                    && met.getParameterTypes().length == methodArgs.length) {
+                method = met;
+                break;
+            }
+        }
+
+        if (method != null) {
+            ZkUtil zkUtil = new ZkUtil(address);
+            Object ret = method.invoke(zkUtil, (Object[]) methodArgs);
+            if (ret != null) {
+                System.out.println("**********");
+                System.out.println(ret);
+                System.out.println("**********");
+            }
+        } else {
+            printUsage();
+        }
+        // zkUtil.deleteRecursive("/s4/listener/process/task-0");
+        // zkUtil.create("/s4_apps_test/sender/process");
+    }
+
+    private static void printUsage() {
+        System.out.println("USAGE");
+        System.out.println("java <zkadress> methodName arguments");
+        Method[] methods = ZkUtil.class.getMethods();
+        for (Method met : methods) {
+            System.out.println(met.getName() + ":"
+                    + Arrays.toString(met.getParameterTypes()));
+        }
+        System.exit(1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/main/java/org/apache/s4/MainApp.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/MainApp.java b/s4-core/src/main/java/org/apache/s4/MainApp.java
index a88e7a5..dedc8d9 100644
--- a/s4-core/src/main/java/org/apache/s4/MainApp.java
+++ b/s4-core/src/main/java/org/apache/s4/MainApp.java
@@ -13,7 +13,7 @@
  * language governing permissions and limitations under the
  * License. See accompanying LICENSE file. 
  */
-package io.s4;
+package org.apache.s4;
 
 import org.apache.s4.ft.SafeKeeper;
 import org.apache.s4.processor.AbstractPE;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java b/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
index bfb6eeb..5e4fad2 100644
--- a/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
+++ b/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
@@ -123,7 +123,7 @@ public class ObjectBuilder {
         String[] query = { "name", "count", "freq" };
         String target[] = { "ACDW", "11" };
 
-        org.apache.s4.message.Request.ClientRInfo rinfo = new io.s4.message.Request.ClientRInfo();
+        org.apache.s4.message.Request.ClientRInfo rinfo = new org.apache.s4.message.Request.ClientRInfo();
         rinfo.setRequesterUUID(UUID.randomUUID());
         Request req = new org.apache.s4.message.SinglePERequest(Arrays.asList(target),
                                                         Arrays.asList(query),
@@ -131,7 +131,7 @@ public class ObjectBuilder {
 
         System.out.println(req.toString());
 
-        InstanceCreator<org.apache.s4.message.Request.RInfo> infoCreator = new InstanceCreator<io.s4.message.Request.RInfo>() {
+        InstanceCreator<org.apache.s4.message.Request.RInfo> infoCreator = new InstanceCreator<org.apache.s4.message.Request.RInfo>() {
             public org.apache.s4.message.Request.RInfo createInstance(Type type) {
                 return new org.apache.s4.message.Request.ClientRInfo();
             }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
index ae2faca..ca4d56f 100644
--- a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
@@ -84,12 +84,12 @@ public class DefaultPartitioner implements Partitioner, VariableKeyPartitioner {
         // Some event types that need special handling
         if (event instanceof org.apache.s4.message.Request) {
             // construct key from request's target
-            org.apache.s4.message.Request r = (io.s4.message.Request) event;
+            org.apache.s4.message.Request r = (org.apache.s4.message.Request) event;
             return r.partition(hasher, delimiter, partitionCount);
 
         } else if (event instanceof org.apache.s4.message.Response) {
             // partition id is encoded in Response, so use it directly.
-            org.apache.s4.message.Response r = (io.s4.message.Response) event;
+            org.apache.s4.message.Response r = (org.apache.s4.message.Response) event;
             return r.partition(partitionCount);
 
         } else if (compoundKeyNames == null) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java b/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
index ae877fb..0cca3b0 100644
--- a/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
+++ b/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
@@ -119,7 +119,7 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
      * (non-Javadoc)
      * 
      * @see
-     * org.apache.s4.processor.AsynchronousEventProcessor#queueWork(io.s4.collector.
+     * org.apache.s4.processor.AsynchronousEventProcessor#queueWork(org.apache.s4.collector.
      * EventWrapper)
      */
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/CheckpointingTest.java b/s4-core/src/test/java/io/s4/ft/CheckpointingTest.java
deleted file mode 100644
index c52a8b4..0000000
--- a/s4-core/src/test/java/io/s4/ft/CheckpointingTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package io.s4.ft;
-
-import io.s4.serialize.KryoSerDeser;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.concurrent.CountDownLatch;
-
-import junit.framework.Assert;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxn.Factory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.esotericsoftware.reflectasm.FieldAccess;
-
-public class CheckpointingTest extends S4TestCase {
-
-    private static Factory zookeeperServerConnectionFactory = null;
-    private S4App app;
-
-    @Before
-    public void prepare() throws Exception {
-        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
-        app = new S4App(getClass(), "s4_core_conf_fs_backend.xml");
-        app.initializeS4App();
-    }
-
-    @After
-    public void cleanup() throws Exception {
-        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
-        if (app!=null) {
-            app.destroy();
-        }
-    }
-
-    @Test
-    public void testCheckpointStorage() throws Exception {
-            final ZooKeeper zk = TestUtils.createZkClient();
-
-            
-
-            // 2. generate a simple event that creates and changes the state of
-            // the
-            // PE
-
-            // NOTE: coordinate through zookeeper
-            final CountDownLatch signalValue1Set = new CountDownLatch(1);
-
-            TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
-            final CountDownLatch signalCheckpointed = new CountDownLatch(1);
-            TestUtils.watchAndSignalCreation("/checkpointed",
-                    signalCheckpointed, zk);
-            EventGenerator gen = new EventGenerator();
-            gen.injectValueEvent(new KeyValue("value1", "message1"), "Stream1",
-                    0);
-
-            signalValue1Set.await();
-            StatefulTestPE pe = (StatefulTestPE) S4TestCase.registeredPEs
-                    .get(new SafeKeeperId("statefulPE", "value"));
-            Assert.assertEquals("message1", pe.getValue1());
-            Assert.assertEquals("", pe.getValue2());
-
-            // 3. generate a checkpoint event 
-            gen.injectValueEvent(new KeyValue("initiateCheckpoint", "blah"),
-                    "Stream1", 0);
-            signalCheckpointed.await();
-
-            // NOTE: the backend has asynchronous save operations
-            Thread.sleep(1000);
-
-            SafeKeeperId safeKeeperId = pe.getSafeKeeperId();
-            File expected = new File(System.getProperty("user.dir")
-                    + File.separator
-                    + "tmp"
-                    + File.separator
-                    + "storage"
-                    + File.separator
-                    + safeKeeperId.getPrototypeId()
-                    + File.separator
-                    + Base64.encodeBase64URLSafeString(safeKeeperId
-                            .getStringRepresentation().getBytes()));
-
-            // 4. verify that state was correctly persisted
-            Assert.assertTrue(expected.exists());
-
-            StatefulTestPE refPE = new StatefulTestPE();
-            refPE.setValue1("message1");
-            refPE.setId("statefulPE");
-            refPE.setKeys(new String[] {});
-            KryoSerDeser kryoSerDeser = new KryoSerDeser();
-            byte[] refBytes = kryoSerDeser.serialize(refPE);
-
-            Assert.assertTrue(Arrays.equals(refBytes,
-                    TestUtils.readFileAsByteArray(expected)));
-
-            app.destroy();
-            
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/EventGenerator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/EventGenerator.java b/s4-core/src/test/java/io/s4/ft/EventGenerator.java
deleted file mode 100644
index 66c5e79..0000000
--- a/s4-core/src/test/java/io/s4/ft/EventGenerator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package io.s4.ft;
-
-import io.s4.collector.EventWrapper;
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-import io.s4.dispatcher.partitioner.KeyInfo;
-import io.s4.emitter.CommLayerEmitter;
-import io.s4.schema.Schema;
-import io.s4.serialize.KryoSerDeser;
-import io.s4.serialize.SerializerDeserializer;
-import io.s4.util.LoadGenerator;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public class EventGenerator {
-
-    private CommLayerEmitter eventEmitter;
-
-    public EventGenerator() {
-        SerializerDeserializer serDeser = new KryoSerDeser();
-
-        eventEmitter = new CommLayerEmitter();
-        eventEmitter.setAppName("s4");
-        eventEmitter.setListenerAppName("s4");
-        eventEmitter.setClusterManagerAddress("localhost");
-        eventEmitter
-                .setSenderId(String.valueOf(System.currentTimeMillis() / 1000));
-        eventEmitter.setSerDeser(serDeser);
-        eventEmitter.init();
-
-        LoadGenerator generator = new LoadGenerator();
-        generator.setEventEmitter(eventEmitter);
-    }
-
-    public void injectValueEvent(KeyValue keyValue, String streamName,
-            int partitionId) throws JSONException {
-
-        Schema schema = new Schema(KeyValue.class);
-        JSONObject jsonRecord = new JSONObject("{key:" + keyValue.getKey()
-                + ",value:" + keyValue.getValue() + "}");
-        Object event = LoadGenerator.makeRecord(jsonRecord, schema);
-        CompoundKeyInfo compoundKeyInfo = new CompoundKeyInfo();
-        compoundKeyInfo.setCompoundKey("key");
-        compoundKeyInfo.setCompoundValue("value");
-        List<CompoundKeyInfo> compoundKeyInfos = new ArrayList<CompoundKeyInfo>();
-        compoundKeyInfos.add(compoundKeyInfo);
-        EventWrapper eventWrapper = new EventWrapper(streamName, event,
-                compoundKeyInfos);
-        eventEmitter.emit(partitionId, eventWrapper);
-    }
-
-    public void injectEvent(Object event, String streamName, int partitionId,
-            List<CompoundKeyInfo> compoundKeyInfos) throws JSONException {
-
-        EventWrapper eventWrapper = new EventWrapper(streamName, event,
-                compoundKeyInfos);
-        eventEmitter.emit(partitionId, eventWrapper);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/KeyValue.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/KeyValue.java b/s4-core/src/test/java/io/s4/ft/KeyValue.java
deleted file mode 100644
index c7ff9e3..0000000
--- a/s4-core/src/test/java/io/s4/ft/KeyValue.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package io.s4.ft;
-
-public class KeyValue {
-
-	String key;
-	String value;
-
-	public KeyValue() {
-	}
-
-	public KeyValue(String key, String value) {
-		super();
-		this.key = key;
-		this.value = value;
-	}
-
-	public String getKey() {
-		return key;
-	}
-
-	public String getValue() {
-		return value;
-	}
-
-	public void setKey(String key) {
-		this.key = key;
-	}
-
-	public void setValue(String value) {
-		this.value = value;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/RecoveryTest.java b/s4-core/src/test/java/io/s4/ft/RecoveryTest.java
deleted file mode 100644
index 177de92..0000000
--- a/s4-core/src/test/java/io/s4/ft/RecoveryTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-package io.s4.ft;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import junit.framework.Assert;
-
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxn.Factory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RecoveryTest extends S4TestCase {
-
-    public static long ZOOKEEPER_PORT = 21810;
-    private Process forkedS4App = null;
-    private static Factory zookeeperServerConnectionFactory = null;
-
-    @Before
-    public void prepare() throws Exception {
-        TestUtils.cleanupTmpDirs();
-        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
-        final ZooKeeper zk = TestUtils.createZkClient();
-        try {
-            zk.delete("/value1Set", -1);
-        } catch (Exception ignored) {
-        }
-        try {
-            // FIXME can't figure out where this is retained
-            zk.delete("/value2Set", -1);
-        } catch (Exception ignored) {
-        }
-        try {
-            // FIXME can't figure out where this is retained
-            zk.delete("/checkpointed", -1);
-        } catch (Exception ignored) {
-        }
-        zk.close();
-    }
-
-    @After
-    public void cleanup() throws Exception {
-        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
-        TestUtils.killS4App(forkedS4App);
-    }
-
-    @Test
-    public void testCheckpointRestorationThroughApplicationEvent()
-            throws Exception {
-        final ZooKeeper zk = TestUtils.createZkClient();
-        // 1. instantiate remote S4 app
-        forkedS4App = TestUtils.forkS4App(getClass().getName(),
-                "s4_core_conf_fs_backend.xml");
-        // TODO synchro
-        Thread.sleep(4000);
-
-        CountDownLatch signalValue1Set = new CountDownLatch(1);
-        TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
-
-        // 2. generate a simple event that changes the state of the PE
-        // --> this event triggers recovery
-        // we inject a value for value2 field (was for value1 in
-        // checkpointing
-        // test). This should trigger recovery and provide a pe with value1
-        // and
-        // value2 set:
-        // value1 from recovery, and value2 from injected event.
-        EventGenerator gen = new EventGenerator();
-        gen.injectValueEvent(new KeyValue("value1", "message1"), "Stream1", 0);
-        signalValue1Set.await();
-        final CountDownLatch signalCheckpointed = new CountDownLatch(1);
-        TestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed,
-                zk);
-        // trigger checkpoint
-        gen.injectValueEvent(new KeyValue("initiateCheckpoint", "blah"),
-                "Stream1", 0);
-        signalCheckpointed.await();
-        // signalCheckpointAddedByBK.await();
-
-        signalValue1Set = new CountDownLatch(1);
-        TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
-        gen.injectValueEvent(new KeyValue("value1", "message1b"), "Stream1", 0);
-        signalValue1Set.await();
-        Assert.assertEquals("value1=message1b ; value2=",
-                TestUtils.readFile(StatefulTestPE.DATA_FILE));
-
-        Thread.sleep(2000);
-        // kill app
-        forkedS4App.destroy();
-        // S4App.killS4App(getClass().getName());
-
-        StatefulTestPE.DATA_FILE.delete();
-
-        forkedS4App = TestUtils.forkS4App(getClass().getName(),
-                "s4_core_conf_fs_backend.xml");
-        // TODO synchro
-        Thread.sleep(2000);
-        // trigger recovery by sending application event to set value 2
-        CountDownLatch signalValue2Set = new CountDownLatch(1);
-        TestUtils.watchAndSignalCreation("/value2Set", signalValue2Set, zk);
-
-        gen.injectValueEvent(new KeyValue("value2", "message2"), "Stream1", 0);
-        signalValue2Set.await(10, TimeUnit.SECONDS);
-
-        // we should get "message1" (checkpointed) instead of "message1b"
-        // (latest)
-        Assert.assertEquals("value1=message1 ; value2=message2",
-                TestUtils.readFile(StatefulTestPE.DATA_FILE));
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/S4App.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/S4App.java b/s4-core/src/test/java/io/s4/ft/S4App.java
deleted file mode 100644
index 749a771..0000000
--- a/s4-core/src/test/java/io/s4/ft/S4App.java
+++ /dev/null
@@ -1,201 +0,0 @@
-package io.s4.ft;
-
-import io.s4.processor.AbstractPE;
-import io.s4.processor.PEContainer;
-import io.s4.util.Watcher;
-import io.s4.util.clock.Clock;
-import io.s4.util.clock.EventClock;
-
-import java.io.File;
-import java.io.FileDescriptor;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.StringWriter;
-import java.lang.management.ManagementFactory;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.log4j.Logger;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.FileSystemXmlApplicationContext;
-import org.springframework.core.io.ClassPathResource;
-
-/**
- * 
- *
- */
-public class S4App {
-
-    String configType = "typical";
-    long seedTime = 0;
-    ApplicationContext appContext = null;
-    ApplicationContext adapterContext = null;
-    private String configBase;
-    boolean configPathsInitialized = false;
-    private String[] coreConfigFileUrls;
-    private Class testClass;
-    private String s4CoreConfFileName;
-    public static File DEFAULT_TEST_OUTPUT_DIR = new File(
-            System.getProperty("user.dir") + File.separator + "tmp");
-    public static File DEFAULT_STORAGE_DIR = new File(
-            DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
-                    + "storage");
-
-    public static String lockDirPath = System.getProperty("user.dir")
-            + File.separator + "tmp" + File.separator + "lock";
-
-    private S4App() {}
-    
-    public S4App(Class testClass, String s4CoreConfFileName) throws Exception {
-        this.testClass = testClass;
-        this.s4CoreConfFileName = s4CoreConfFileName;
-        initConfigPaths(testClass, s4CoreConfFileName);
-    }
-    /**
-     * @param args
-     * @throws Exception
-     */
-    public static void main(String[] args) throws Exception {
-        Class testClass = Class.forName(args[0]);
-        String s4CoreConfFile = args[1];
-        S4App app = new S4App(testClass, s4CoreConfFile);
-        S4TestCase.initS4Parameters();
-        app.initializeS4App();
-
-    }
-
-    /**
-     * Performs dependency injection and starts the S4 plaftform.
-     */
-    public void initializeS4App()
-            throws Exception {
-        initConfigPaths(testClass, s4CoreConfFileName);
-        ApplicationContext coreContext = null;
-
-        coreContext = new FileSystemXmlApplicationContext(coreConfigFileUrls,
-                coreContext);
-        ApplicationContext context = coreContext;
-
-        Clock clock = (Clock) context.getBean("clock");
-        if (clock instanceof EventClock && seedTime > 0) {
-            EventClock s4EventClock = (EventClock) clock;
-            s4EventClock.updateTime(seedTime);
-            System.out.println("Intializing event clock time with seed time "
-                    + s4EventClock.getCurrentTime());
-        }
-
-        PEContainer peContainer = (PEContainer) context.getBean("peContainer");
-
-        Watcher w = (Watcher) context.getBean("watcher");
-        w.setConfigFilename(configBase + s4CoreConfFileName);
-
-        // load extension modules
-        // String[] configFileNames = getModuleConfigFiles(extsHome, prop);
-        // if (configFileNames.length > 0) {
-        // String[] configFileUrls = new String[configFileNames.length];
-        // for (int i = 0; i < configFileNames.length; i++) {
-        // configFileUrls[i] = "file:" + configFileNames[i];
-        // }
-        // context = new FileSystemXmlApplicationContext(configFileUrls,
-        // context);
-        // }
-
-        // load application modules
-        String applicationConfigFileName = configBase + "app_conf.xml";
-        String[] configFileUrls = new String[] { "file:"
-                + applicationConfigFileName };
-        context = new FileSystemXmlApplicationContext(configFileUrls, context);
-        // attach any beans that implement ProcessingElement to the PE
-        // Container
-        String[] processingElementBeanNames = context
-                .getBeanNamesForType(AbstractPE.class);
-        for (String processingElementBeanName : processingElementBeanNames) {
-            Object bean = context.getBean(processingElementBeanName);
-            try {
-                Method getS4ClockMethod = bean.getClass().getMethod(
-                        "getClock");
-
-                if (getS4ClockMethod.getReturnType().equals(Clock.class)) {
-                    if (getS4ClockMethod.invoke(bean) == null) {
-                        Method setS4ClockMethod = bean.getClass().getMethod(
-                                "setClock", Clock.class);
-                        setS4ClockMethod.invoke(bean,
-                                coreContext.getBean("clock"));
-                    }
-                }
-                ((AbstractPE)bean).setSafeKeeper((SafeKeeper) context.getBean("safeKeeper"));
-            } catch (NoSuchMethodException mnfe) {
-                // acceptable
-            }
-            System.out.println("Adding processing element with bean name "
-                    + processingElementBeanName + ", id "
-                    + ((AbstractPE) bean).getId());
-            peContainer.addProcessor((AbstractPE) bean);
-        }
-
-        appContext = context;
-    }
-    
-    
-
-    private void initConfigPaths(Class testClass, String s4CoreConfFileName)
-            throws IOException {
-        if (!configPathsInitialized) {
-            S4TestCase.initS4Parameters();
-            String testDir = testClass.getPackage().getName()
-                    .replace('.', File.separatorChar);
-
-            ClassPathResource propResource = new ClassPathResource(
-                    "s4_core.properties");
-            Properties prop = new Properties();
-            if (propResource.exists()) {
-                prop.load(propResource.getInputStream());
-            } else {
-                System.err
-                        .println("Unable to find s4_core.properties. It must be available in classpath");
-                Thread.dumpStack();
-                System.exit(12);
-            }
-
-            configBase = System.getProperty("user.dir") + File.separator
-                    + "src" + File.separator + "test" + File.separator + "java"
-                    + File.separator + testDir + File.separator;
-            String configPath = configBase + File.separatorChar
-                    + "wall_clock.xml";
-            List<String> coreConfigUrls = new ArrayList<String>();
-            coreConfigUrls.add(configPath);
-
-            // load core config xml
-            if (s4CoreConfFileName != null) {
-                // may be null for adapter
-                configPath = configBase + s4CoreConfFileName;
-                File configFile = new File(configPath);
-                if (!configFile.exists()) {
-                    System.err.printf(
-                            "S4 core config file %s does not exist\n",
-                            configPath);
-                    Thread.dumpStack();
-                    System.exit(13);
-                }
-                coreConfigUrls.add(configPath);
-            }
-            String[] coreConfigFiles = new String[coreConfigUrls.size()];
-            coreConfigUrls.toArray(coreConfigFiles);
-
-            coreConfigFileUrls = new String[coreConfigFiles.length];
-            for (int i = 0; i < coreConfigFiles.length; i++) {
-                coreConfigFileUrls[i] = "file:" + coreConfigFiles[i];
-            }
-            configPathsInitialized = true;
-
-        }
-    }
-    
-    public void destroy() {
-        ((FileSystemXmlApplicationContext)appContext).close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/S4TestCase.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/S4TestCase.java b/s4-core/src/test/java/io/s4/ft/S4TestCase.java
deleted file mode 100644
index 9d7b5ab..0000000
--- a/s4-core/src/test/java/io/s4/ft/S4TestCase.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package io.s4.ft;
-
-import io.s4.processor.AbstractPE;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Hashtable;
-import java.util.Map;
-
-import org.junit.BeforeClass;
-import org.springframework.context.ApplicationContext;
-
-public class S4TestCase {
-
-    String configType = "typical";
-    long seedTime = 0;
-    ApplicationContext appContext = null;
-    ApplicationContext adapterContext = null;
-    boolean configPathsInitialized = false;
-    public static File DEFAULT_TEST_OUTPUT_DIR = new File(
-            System.getProperty("user.dir") + File.separator + "tmp");
-    public static File DEFAULT_STORAGE_DIR = new File(
-            DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
-                    + "storage");
-    // use a static map to track PE instances
-    public static final Map<Object, AbstractPE> registeredPEs = new Hashtable<Object, AbstractPE>();
-
-    
-    @BeforeClass
-    public static void cleanLocks() {
-        TestUtils.cleanupTmpDirs();
-    }
-    
-
-    @BeforeClass
-    public static void initS4Parameters() throws IOException {
-    
-        System.setProperty("commlayer_mode", "static");
-        System.setProperty("commlayer.mode", "static");
-        System.setProperty("DequeueCount", "6");
-        System.setProperty("lock_dir", S4App.lockDirPath);
-        File lockDir = new File(S4App.lockDirPath);
-        if (!lockDir.exists()) {
-            if (!lockDir.mkdirs()) {
-                throw new RuntimeException("Cannot create directory: ["+lockDir.getAbsolutePath()+"]");
-            }
-        } else {
-            TestUtils.deleteDirectoryContents(lockDir);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/SimpleEventProducer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/SimpleEventProducer.java b/s4-core/src/test/java/io/s4/ft/SimpleEventProducer.java
deleted file mode 100644
index 002d181..0000000
--- a/s4-core/src/test/java/io/s4/ft/SimpleEventProducer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package io.s4.ft;
-
-import io.s4.collector.EventWrapper;
-import io.s4.listener.EventHandler;
-import io.s4.listener.EventProducer;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class SimpleEventProducer implements EventProducer {
-
-	private Set<io.s4.listener.EventHandler> handlers = new HashSet<io.s4.listener.EventHandler>();
-	private String streamName;
-
-	LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>();
-
-	public void init() {
-	}
-
-	@Override
-	public void addHandler(EventHandler handler) {
-		handlers.add(handler);
-
-	}
-
-	@Override
-	public boolean removeHandler(EventHandler handler) {
-		return handlers.remove(handler);
-	}
-
-	public void setStreamName(String streamName) {
-		this.streamName = streamName;
-	}
-
-	public String getStreamName() {
-		return streamName;
-	}
-
-	// TODO JSON-like stuff
-	public void produceEvent(String message) {
-		EventWrapper ew = new EventWrapper(streamName, message, null);
-		for (io.s4.listener.EventHandler handler : handlers) {
-			try {
-				handler.processEvent(ew);
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
-		}
-
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/ft/StatefulTestPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/ft/StatefulTestPE.java b/s4-core/src/test/java/io/s4/ft/StatefulTestPE.java
deleted file mode 100644
index b4af477..0000000
--- a/s4-core/src/test/java/io/s4/ft/StatefulTestPE.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package io.s4.ft;
-
-import io.s4.processor.AbstractPE;
-
-import java.io.File;
-import java.io.FileDescriptor;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-
-public class StatefulTestPE extends AbstractPE implements Watcher {
-
-    String id;
-    String value1 = "";
-    String value2 = "";
-    transient ZooKeeper zk = null;
-    transient public static File DATA_FILE = new File(
-            System.getProperty("user.dir")
-            + File.separator + "tmp" + File.separator + "StatefulTestPE.data");;
-
-    @Override
-    public String getId() {
-        return id;
-    }
-
-    @Override
-    public void output() {
-        // TODO Auto-generated method stub
-
-    }
-
-    public void processEvent(KeyValue event) {
-        if (zk == null) {
-            try {
-                zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-        if (!S4TestCase.registeredPEs.containsKey(getSafeKeeperId())) {
-            S4TestCase.registeredPEs.put(getSafeKeeperId(), this);
-        }
-        try {
-
-            if ("value1".equals(event.getKey())) {
-                setValue1(event.getValue());
-                zk.create("/value1Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
-                        CreateMode.PERSISTENT);
-            } else if ("value2".equals(event.getKey())) {
-                setValue2(event.getValue());
-                zk.create("/value2Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
-                        CreateMode.PERSISTENT);
-            } else if ("initiateCheckpoint".equals(event.getKey())) {
-                initiateCheckpoint();
-            } else {
-                throw new RuntimeException("unidentified event: " + event);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    public String getValue1() {
-        return value1;
-    }
-
-    public void setValue1(String value1) {
-        this.value1 = value1;
-        persistValues();
-    }
-
-    public String getValue2() {
-        return value2;
-    }
-
-    public void setValue2(String value2) {
-        this.value2 = value2;
-        persistValues();
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    protected void checkpoint() {
-        super.checkpoint();
-        try {
-            zk.create("/checkpointed", new byte[0], Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    // NOTE: we use a file as a simple way to keep track of changes
-    private void persistValues() {
-
-        if (DATA_FILE.exists()) {
-            if (!DATA_FILE.delete()) {
-                throw new RuntimeException("Cannot delete datafile "
-                        + DATA_FILE.getAbsolutePath());
-            }
-        }
-        try {
-            if (!DATA_FILE.createNewFile()) {
-                throw new RuntimeException("Cannot create datafile "
-                        + DATA_FILE.getAbsolutePath());
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("Cannot create datafile "
-                    + DATA_FILE.getAbsolutePath());
-        }
-        try {
-            TestUtils.writeStringToFile("value1=" + value1 + " ; value2=" + value2,
-                    DATA_FILE);
-        } catch (IOException e) {
-            throw new RuntimeException("Cannot write to datafile "
-                    + DATA_FILE.getAbsolutePath());
-        }
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        // TODO Auto-generated method stub
-
-    }
-
-}