You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:15 UTC

[21/50] [abbrv] git commit: Rename packages in preparation for move to Apache

Rename packages in preparation for move to Apache


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/2f93667d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/2f93667d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/2f93667d

Branch: refs/heads/dev
Commit: 2f93667d98ebbb00de75417ba4e9c289b7fc8e6f
Parents: 9adaa07
Author: Bruce Robbins <ro...@everychoose-lm.corp.yahoo.com>
Authored: Sun Nov 20 21:22:54 2011 -0800
Committer: Bruce Robbins <ro...@everychoose-lm.corp.yahoo.com>
Committed: Sun Nov 20 21:22:54 2011 -0800

----------------------------------------------------------------------
 .../java/io/s4/comm/core/CommEventCallback.java    |   24 -
 .../main/java/io/s4/comm/core/CommLayerState.java  |    5 -
 .../java/io/s4/comm/core/CommServiceFactory.java   |   74 ---
 .../main/java/io/s4/comm/core/DefaultWatcher.java  |  112 ----
 .../main/java/io/s4/comm/core/Deserializer.java    |   21 -
 .../main/java/io/s4/comm/core/GenericListener.java |   96 ----
 .../main/java/io/s4/comm/core/GenericSender.java   |  189 -------
 .../main/java/io/s4/comm/core/GenericSerDeser.java |   30 -
 .../main/java/io/s4/comm/core/ListenerProcess.java |   91 ---
 .../main/java/io/s4/comm/core/MulticastSender.java |   79 ---
 .../main/java/io/s4/comm/core/ProcessMonitor.java  |   33 --
 .../src/main/java/io/s4/comm/core/SendMode.java    |   20 -
 .../main/java/io/s4/comm/core/SenderProcess.java   |  135 -----
 .../src/main/java/io/s4/comm/core/Serializer.java  |   21 -
 .../src/main/java/io/s4/comm/core/TaskManager.java |   24 -
 .../java/io/s4/comm/file/StaticProcessMonitor.java |   78 ---
 .../java/io/s4/comm/file/StaticTaskManager.java    |  191 -------
 .../java/io/s4/comm/test/ProcessMonitorTest.java   |   79 ---
 .../main/java/io/s4/comm/test/TaskManagerTest.java |   82 ---
 .../java/io/s4/comm/test/TestTaskSetupApp.java     |  122 ----
 .../src/main/java/io/s4/comm/test/ZkQueueTest.java |   57 --
 .../main/java/io/s4/comm/tools/TaskSetupApp.java   |   86 ---
 .../src/main/java/io/s4/comm/util/CommUtil.java    |   38 --
 .../main/java/io/s4/comm/util/ConfigParser.java    |  439 ---------------
 .../src/main/java/io/s4/comm/util/ConfigUtils.java |   58 --
 s4-comm/src/main/java/io/s4/comm/util/IOUtil.java  |   74 ---
 .../src/main/java/io/s4/comm/util/JSONUtil.java    |  285 ----------
 .../src/main/java/io/s4/comm/util/SystemUtils.java |   39 --
 .../src/main/java/io/s4/comm/zk/ThreadTest.java    |   64 ---
 .../main/java/io/s4/comm/zk/ZkProcessMonitor.java  |  147 -----
 s4-comm/src/main/java/io/s4/comm/zk/ZkQueue.java   |  117 ----
 .../src/main/java/io/s4/comm/zk/ZkTaskManager.java |  153 -----
 .../src/main/java/io/s4/comm/zk/ZkTaskSetup.java   |  282 ---------
 s4-comm/src/main/java/io/s4/comm/zk/ZkUtil.java    |  144 -----
 .../org/apache/s4/comm/core/CommEventCallback.java |   24 +
 .../org/apache/s4/comm/core/CommLayerState.java    |    5 +
 .../apache/s4/comm/core/CommServiceFactory.java    |   74 +++
 .../org/apache/s4/comm/core/DefaultWatcher.java    |  112 ++++
 .../java/org/apache/s4/comm/core/Deserializer.java |   21 +
 .../org/apache/s4/comm/core/GenericListener.java   |   96 ++++
 .../org/apache/s4/comm/core/GenericSender.java     |  189 +++++++
 .../org/apache/s4/comm/core/GenericSerDeser.java   |   30 +
 .../org/apache/s4/comm/core/ListenerProcess.java   |   91 +++
 .../org/apache/s4/comm/core/MulticastSender.java   |   79 +++
 .../org/apache/s4/comm/core/ProcessMonitor.java    |   33 ++
 .../java/org/apache/s4/comm/core/SendMode.java     |   20 +
 .../org/apache/s4/comm/core/SenderProcess.java     |  135 +++++
 .../java/org/apache/s4/comm/core/Serializer.java   |   21 +
 .../java/org/apache/s4/comm/core/TaskManager.java  |   24 +
 .../apache/s4/comm/file/StaticProcessMonitor.java  |   78 +++
 .../org/apache/s4/comm/file/StaticTaskManager.java |  191 +++++++
 .../apache/s4/comm/test/ProcessMonitorTest.java    |   79 +++
 .../org/apache/s4/comm/test/TaskManagerTest.java   |   82 +++
 .../org/apache/s4/comm/test/TestTaskSetupApp.java  |  122 ++++
 .../java/org/apache/s4/comm/test/ZkQueueTest.java  |   57 ++
 .../org/apache/s4/comm/tools/TaskSetupApp.java     |   86 +++
 .../java/org/apache/s4/comm/util/CommUtil.java     |   38 ++
 .../java/org/apache/s4/comm/util/ConfigParser.java |  439 +++++++++++++++
 .../java/org/apache/s4/comm/util/ConfigUtils.java  |   58 ++
 .../main/java/org/apache/s4/comm/util/IOUtil.java  |   74 +++
 .../java/org/apache/s4/comm/util/JSONUtil.java     |  285 ++++++++++
 .../java/org/apache/s4/comm/util/SystemUtils.java  |   39 ++
 .../java/org/apache/s4/comm/zk/ThreadTest.java     |   64 +++
 .../org/apache/s4/comm/zk/ZkProcessMonitor.java    |  147 +++++
 .../main/java/org/apache/s4/comm/zk/ZkQueue.java   |  117 ++++
 .../java/org/apache/s4/comm/zk/ZkTaskManager.java  |  153 +++++
 .../java/org/apache/s4/comm/zk/ZkTaskSetup.java    |  282 +++++++++
 .../main/java/org/apache/s4/comm/zk/ZkUtil.java    |  144 +++++
 s4-core/src/main/java/org/apache/s4/MainApp.java   |    2 +-
 .../org/apache/s4/client/util/ObjectBuilder.java   |    4 +-
 .../dispatcher/partitioner/DefaultPartitioner.java |    4 +-
 .../java/org/apache/s4/processor/PEContainer.java  |    2 +-
 .../src/test/java/io/s4/ft/CheckpointingTest.java  |  108 ----
 s4-core/src/test/java/io/s4/ft/EventGenerator.java |   63 --
 s4-core/src/test/java/io/s4/ft/KeyValue.java       |   33 --
 s4-core/src/test/java/io/s4/ft/RecoveryTest.java   |  113 ----
 s4-core/src/test/java/io/s4/ft/S4App.java          |  201 -------
 s4-core/src/test/java/io/s4/ft/S4TestCase.java     |   52 --
 .../test/java/io/s4/ft/SimpleEventProducer.java    |   54 --
 s4-core/src/test/java/io/s4/ft/StatefulTestPE.java |  136 -----
 .../test/java/io/s4/ft/TestRedisStateStorage.java  |  146 -----
 s4-core/src/test/java/io/s4/ft/TestUtils.java      |  399 -------------
 s4-core/src/test/java/io/s4/ft/adapter.properties  |    2 -
 s4-core/src/test/java/io/s4/ft/adapter_conf.xml    |   40 --
 .../src/test/java/io/s4/ft/app_adapter_conf.xml    |   14 -
 s4-core/src/test/java/io/s4/ft/app_conf.xml        |   26 -
 .../test/java/io/s4/ft/s4_core_conf_bk_backend.xml |  198 -------
 .../test/java/io/s4/ft/s4_core_conf_fs_backend.xml |  196 -------
 s4-core/src/test/java/io/s4/ft/wall_clock.xml      |    6 -
 .../java/io/s4/ft/wordcount/FTWordCountTest.java   |  159 ------
 .../src/test/java/io/s4/ft/wordcount/app_conf.xml  |   94 ---
 .../io/s4/ft/wordcount/s4_core_conf_fs_backend.xml |  196 -------
 .../s4/ft/wordcount/s4_core_conf_redis_backend.xml |  198 -------
 .../test/java/io/s4/ft/wordcount/wall_clock.xml    |    6 -
 s4-core/src/test/java/io/s4/processor/MockPE.java  |   29 -
 .../java/io/s4/processor/TestPrototypeWrapper.java |   31 -
 s4-core/src/test/java/io/s4/wordcount/Word.java    |   22 -
 .../test/java/io/s4/wordcount/WordClassifier.java  |  109 ----
 .../src/test/java/io/s4/wordcount/WordCount.java   |   43 --
 .../test/java/io/s4/wordcount/WordCountTest.java   |   83 ---
 .../src/test/java/io/s4/wordcount/WordCounter.java |   60 --
 .../test/java/io/s4/wordcount/WordSplitter.java    |   61 --
 s4-core/src/test/java/io/s4/wordcount/app_conf.xml |   92 ---
 .../src/test/java/io/s4/wordcount/s4_core_conf.xml |  194 -------
 .../src/test/java/io/s4/wordcount/wall_clock.xml   |    6 -
 .../java/org/apache/s4/ft/CheckpointingTest.java   |  108 ++++
 .../test/java/org/apache/s4/ft/EventGenerator.java |   63 ++
 .../src/test/java/org/apache/s4/ft/KeyValue.java   |   33 ++
 .../test/java/org/apache/s4/ft/RecoveryTest.java   |  113 ++++
 s4-core/src/test/java/org/apache/s4/ft/S4App.java  |  201 +++++++
 .../src/test/java/org/apache/s4/ft/S4TestCase.java |   52 ++
 .../java/org/apache/s4/ft/SimpleEventProducer.java |   54 ++
 .../test/java/org/apache/s4/ft/StatefulTestPE.java |  136 +++++
 .../org/apache/s4/ft/TestRedisStateStorage.java    |  146 +++++
 .../src/test/java/org/apache/s4/ft/TestUtils.java  |  399 +++++++++++++
 .../test/java/org/apache/s4/ft/adapter.properties  |    2 +
 .../test/java/org/apache/s4/ft/adapter_conf.xml    |   40 ++
 .../java/org/apache/s4/ft/app_adapter_conf.xml     |   14 +
 .../src/test/java/org/apache/s4/ft/app_conf.xml    |   26 +
 .../org/apache/s4/ft/s4_core_conf_bk_backend.xml   |  198 +++++++
 .../org/apache/s4/ft/s4_core_conf_fs_backend.xml   |  196 +++++++
 .../src/test/java/org/apache/s4/ft/wall_clock.xml  |    6 +
 .../apache/s4/ft/wordcount/FTWordCountTest.java    |  159 ++++++
 .../java/org/apache/s4/ft/wordcount/app_conf.xml   |   94 +++
 .../s4/ft/wordcount/s4_core_conf_fs_backend.xml    |  196 +++++++
 .../s4/ft/wordcount/s4_core_conf_redis_backend.xml |  198 +++++++
 .../java/org/apache/s4/ft/wordcount/wall_clock.xml |    6 +
 .../test/java/org/apache/s4/processor/MockPE.java  |   29 +
 .../apache/s4/processor/TestPrototypeWrapper.java  |   31 +
 .../test/java/org/apache/s4/wordcount/Word.java    |   22 +
 .../org/apache/s4/wordcount/WordClassifier.java    |  109 ++++
 .../java/org/apache/s4/wordcount/WordCount.java    |   43 ++
 .../org/apache/s4/wordcount/WordCountTest.java     |   83 +++
 .../java/org/apache/s4/wordcount/WordCounter.java  |   60 ++
 .../java/org/apache/s4/wordcount/WordSplitter.java |   61 ++
 .../test/java/org/apache/s4/wordcount/app_conf.xml |   92 +++
 .../java/org/apache/s4/wordcount/s4_core_conf.xml  |  194 +++++++
 .../java/org/apache/s4/wordcount/wall_clock.xml    |    6 +
 138 files changed, 6665 insertions(+), 6665 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/CommEventCallback.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/CommEventCallback.java b/s4-comm/src/main/java/io/s4/comm/core/CommEventCallback.java
deleted file mode 100644
index a7a50f7..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/CommEventCallback.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-import java.util.Map;
-
-public interface CommEventCallback {
-
-    public void handleCallback(Map<String, Object> eventData);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/CommLayerState.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/CommLayerState.java b/s4-comm/src/main/java/io/s4/comm/core/CommLayerState.java
deleted file mode 100644
index add6284..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/CommLayerState.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package io.s4.comm.core;
-
-public enum CommLayerState {
-    INITIALIZED, BROKEN
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/CommServiceFactory.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/CommServiceFactory.java b/s4-comm/src/main/java/io/s4/comm/core/CommServiceFactory.java
deleted file mode 100644
index 41900e3..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/CommServiceFactory.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-import io.s4.comm.file.StaticProcessMonitor;
-import io.s4.comm.file.StaticTaskManager;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import io.s4.comm.zk.ZkProcessMonitor;
-import io.s4.comm.zk.ZkTaskManager;
-
-import org.apache.log4j.Logger;
-
-/**
- * Common Factory class to provide appropriate implementations
- * 
- * @author kishoreg
- * 
- */
-public class CommServiceFactory {
-    private static Logger logger = Logger.getLogger(CommServiceFactory.class);
-
-    public static TaskManager getTaskManager(String zkaddress,
-            String clusterName, ClusterType clusterType,
-            CommEventCallback callbackHandler) {
-        String mode = System.getProperty("commlayer.mode");
-        TaskManager taskManager = null;
-        if (mode != null && mode.equalsIgnoreCase("static")) {
-            logger.info("Comm layer mode is set to static");
-            taskManager = new StaticTaskManager(zkaddress,
-                                                clusterName,
-                                                clusterType,
-                                                callbackHandler);
-        } else {
-            taskManager = new ZkTaskManager(zkaddress,
-                                            clusterName,
-                                            clusterType,
-                                            callbackHandler);
-        }
-
-        return taskManager;
-    }
-
-    public static ProcessMonitor getProcessMonitor(String zkaddress,
-            String clusterName, CommEventCallback callbackHandler) {
-        ProcessMonitor processMonitor = null;
-        String mode = System.getProperty("commlayer.mode");
-        if (mode != null && mode.equalsIgnoreCase("static")) {
-            logger.info("Comm layer mode is set to static");
-            processMonitor = new StaticProcessMonitor(zkaddress,
-                                                      clusterName,
-                                                      ClusterType.S4);
-        } else {
-            processMonitor = new ZkProcessMonitor(zkaddress,
-                                                  clusterName,
-                                                  ClusterType.S4,
-                                                  callbackHandler);
-        }
-        return processMonitor;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/DefaultWatcher.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/DefaultWatcher.java b/s4-comm/src/main/java/io/s4/comm/core/DefaultWatcher.java
deleted file mode 100644
index 93c11a4..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/DefaultWatcher.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-public class DefaultWatcher implements Watcher {
-
-    public static List<KeeperState> interestingStates = new ArrayList<KeeperState>();
-    static {
-        interestingStates.add(KeeperState.Expired);
-        interestingStates.add(KeeperState.SyncConnected);
-    }
-    public static Logger logger = Logger.getLogger(DefaultWatcher.class);
-    protected ZooKeeper zk = null;
-    protected Integer mutex;
-    protected String root;
-    protected WatchedEvent currentEvent;
-    protected CommEventCallback callbackHandler;
-    private String zkAddress;
-    volatile boolean connected = false;
-
-    protected DefaultWatcher(String address) {
-        this(address, null);
-    }
-
-    protected DefaultWatcher(String address, CommEventCallback callbackHandler) {
-        this.zkAddress = address;
-        this.callbackHandler = callbackHandler;
-        if (zk == null) {
-            try {
-                logger.info("Connecting to  zookeeper server:" + address);
-                String sTimeout = System.getProperty("zk.session.timeout");
-                System.out.println("sTimeout=" + sTimeout);
-                int timeout = 30000;
-                if (sTimeout != null) {
-                    try {
-                        timeout = Integer.parseInt(sTimeout);
-                    } catch (Exception e) {
-                        // ignore will use default
-                    }
-                }
-                mutex = new Integer(-1);
-                synchronized (mutex) {
-                    zk = new ZooKeeper(address, timeout, this);
-                    while (!connected) {
-                        logger.info("Waiting for connection to be established ");
-                        mutex.wait();
-                    }
-                }
-                logger.info("Connected to zookeeper with sessionid: "
-                        + zk.getSessionId() + " and session timeout(ms): "
-                        + timeout);
-
-            } catch (Exception e) {
-                logger.error("Failed to connect to zookeeper:" + e.getMessage(),
-                             e);
-                zk = null;
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    synchronized public void process(WatchedEvent event) {
-        logger.info("Received zk event:" + event);
-        synchronized (mutex) {
-            currentEvent = event;
-            if (event.getState() == KeeperState.SyncConnected) {
-                connected = true;
-            }
-            if (callbackHandler != null
-                    && interestingStates.contains(event.getState())) {
-                Map<String, Object> eventData = new HashMap<String, Object>();
-                if (event.getState() == KeeperState.SyncConnected) {
-                    eventData.put("state", CommLayerState.INITIALIZED);
-                } else if (event.getState() == KeeperState.Expired) {
-                    eventData.put("state", CommLayerState.BROKEN);
-                }
-                eventData.put("source", event);
-                callbackHandler.handleCallback(eventData);
-            }
-            mutex.notify();
-        }
-    }
-
-    public String getZkAddress() {
-        return zkAddress;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/Deserializer.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/Deserializer.java b/s4-comm/src/main/java/io/s4/comm/core/Deserializer.java
deleted file mode 100644
index 10f335b..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/Deserializer.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-public interface Deserializer {
-
-    public Object deserialize(byte[] buffer);
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/GenericListener.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/GenericListener.java b/s4-comm/src/main/java/io/s4/comm/core/GenericListener.java
deleted file mode 100644
index 79b7ba8..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/GenericListener.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class GenericListener {
-    private static Logger logger = Logger.getLogger(GenericListener.class);
-    private String zkAddress;
-    private DatagramSocket socket;
-    int BUFFER_LENGTH = 65507;
-    private DatagramPacket dgram;
-    private byte[] bs;
-    final Deserializer deserializer;
-
-    public GenericListener(String zkaddress, String appName,
-            Object listenerConfig) {
-        this(zkaddress, appName, listenerConfig, new GenericSerDeser());
-    }
-
-    public GenericListener(String zkaddress, String appName,
-            Object listenerConfig, Deserializer deserializer) {
-        this.zkAddress = zkAddress;
-        this.deserializer = deserializer;
-        try {
-            Map<String, String> map = (Map<String, String>) listenerConfig;
-            String mode = map.get("mode");
-            int port = Integer.parseInt(map.get("port"));
-            if (mode.equals("multicast")) {
-                InetAddress inetAddress = InetAddress.getByName(map.get("channel"));
-                socket = new MulticastSocket(port);
-                ((MulticastSocket) socket).joinGroup(inetAddress);
-            }
-            if (mode.equals("unicast")) {
-                socket = new DatagramSocket(port);
-            }
-            String udpBufferSize = System.getProperty("udp.buffer.size");
-            if (udpBufferSize == null) {
-                udpBufferSize = "4194302";
-            }
-            socket.setReceiveBufferSize(Integer.parseInt(udpBufferSize));
-            bs = new byte[BUFFER_LENGTH];
-            dgram = new DatagramPacket(bs, bs.length);
-        } catch (IOException e) {
-            logger.error("error creating listener", e);
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    public Object receive() {
-        try {
-            socket.receive(dgram);
-            byte[] data = new byte[dgram.getLength()];
-            System.arraycopy(dgram.getData(),
-                             dgram.getOffset(),
-                             data,
-                             0,
-                             data.length);
-            Object object = deserializer.deserialize(data);
-            dgram.setLength(BUFFER_LENGTH);
-            return object;
-        } catch (IOException e) {
-            logger.error("error receiving message", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    /*
-     * There is nothing much to do for multicast and unicast
-     */
-    public void start() {
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/GenericSender.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/GenericSender.java b/s4-comm/src/main/java/io/s4/comm/core/GenericSender.java
deleted file mode 100644
index 1d223d1..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/GenericSender.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.util.List;
-import java.util.Map;
-import org.apache.log4j.Logger;
-
-public class GenericSender {
-    static Logger logger = Logger.getLogger(GenericSender.class);
-    Map<String, String> map;
-    private DatagramSocket socket;
-    private final String zkAddress;
-    ProcessMonitor listenerMonitor;
-    int rotationCounter = 0;
-    private final Serializer serializer;
-    private final int listenerTaskCount;
-    private CommEventCallback callbackHandler;
-    private String mode;
-
-    public GenericSender(String zkAddress, String appName,
-            Object senderConfigData) {
-        this(zkAddress, appName, appName, senderConfigData);
-    }
-
-    @SuppressWarnings("unchecked")
-    public GenericSender(String zkAddress, String adapterClusterName,
-            String s4ClusterName, Object senderConfigData, Serializer serializer) {
-        this.zkAddress = zkAddress;
-        this.serializer = serializer;
-        try {
-            map = (Map<String, String>) senderConfigData;
-            mode = map.get("mode");
-            if (mode.equals("multicast")) {
-                socket = new MulticastSocket();
-            }
-            if (mode.equals("unicast")) {
-                socket = new DatagramSocket();
-            }
-            listenerMonitor = CommServiceFactory.getProcessMonitor(this.zkAddress,
-                                                                   s4ClusterName,
-                                                                   callbackHandler);
-            if (callbackHandler != null) {
-                // listenerMonitor.setCallbackHandler(callbackHandler);
-            }
-            listenerMonitor.monitor();
-            this.listenerTaskCount = listenerMonitor.getTaskCount();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public GenericSender(String zkAddress, String senderAppName,
-            String listenerAppName, Object senderConfigData) {
-        this(zkAddress,
-             senderAppName,
-             listenerAppName,
-             senderConfigData,
-             new GenericSerDeser());
-    }
-
-    /**
-     * This method will send the data to receivers in a round robin fashion
-     * 
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    @SuppressWarnings("unchecked")
-    public boolean send(Object data) {
-        try {
-            List<Object> destinationList = listenerMonitor.getDestinationList();
-            if (destinationList == null || destinationList.size() == 0) {
-                logger.error("Failed to send message: No destination available"
-                        + data);
-                return false;
-            }
-            byte[] byteBuffer = serializer.serialize(data);
-            rotationCounter = rotationCounter + 1;
-
-            int index = rotationCounter % destinationList.size();
-            Map<String, String> dest = (Map<String, String>) destinationList.get(Math.abs(index));
-            InetAddress inetAddress;
-            int port;
-            if (mode.equals("unicast")) {
-                inetAddress = InetAddress.getByName(dest.get("address"));
-                port = Integer.parseInt((dest.get("port")));
-            } else if (mode.equals("multicast")) {
-                inetAddress = InetAddress.getByName(dest.get("channel"));
-                port = Integer.parseInt((dest.get("port")));
-            } else {
-                logger.error("Failed to send message unknown mode: " + mode);
-                return false;
-            }
-            DatagramPacket dp = new DatagramPacket(byteBuffer,
-                                                   byteBuffer.length,
-                                                   inetAddress,
-                                                   port);
-            socket.send(dp);
-        } catch (IOException e) {
-            // add retry
-            logger.error("Failed to send message: " + data, e);
-            return false;
-        }
-        return true;
-    }
-
-    /**
-     * This will send the data to a specific channel/receiver/partition
-     * 
-     * @param partition
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    @SuppressWarnings("unchecked")
-    public boolean sendToPartition(int partition, Object data) {
-        try {
-            byte[] byteBuffer = serializer.serialize(data);
-            Map<Integer, Object> destinationMap = listenerMonitor.getDestinationMap();
-            if (logger.isDebugEnabled()) {
-                logger.debug("Destination Map:" + destinationMap);
-            }
-            Map<String, String> dest = (Map<String, String>) destinationMap.get(partition);
-            if (dest != null) {
-                InetAddress inetAddress = InetAddress.getByName(dest.get("address"));
-                int port = Integer.parseInt((dest.get("port")));
-                DatagramPacket dp = new DatagramPacket(byteBuffer,
-                                                       byteBuffer.length,
-                                                       inetAddress,
-                                                       port);
-                socket.send(dp);
-            } else {
-                logger.warn("Destination not available for partition:"
-                        + partition + " Skipping message:" + data);
-                return false;
-            }
-        } catch (IOException e) {
-            // add retry
-            logger.error("Failed to send message: " + data, e);
-            return false;
-        }
-
-        return true;
-
-    }
-
-    /**
-     * compute partition using hashcode and send to appropriate partition
-     * 
-     * @param hashcode
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    public boolean sendUsingHashCode(int hashcode, Object data) {
-        int partition = (hashcode & Integer.MAX_VALUE) % listenerTaskCount;
-        return sendToPartition(partition, data);
-
-    }
-
-    public CommEventCallback getCallbackHandler() {
-        return callbackHandler;
-    }
-
-    public void setCallbackHandler(CommEventCallback callbackHandler) {
-        this.callbackHandler = callbackHandler;
-    }
-
-    public int getListenerTaskCount() {
-        return listenerTaskCount;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/GenericSerDeser.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/GenericSerDeser.java b/s4-comm/src/main/java/io/s4/comm/core/GenericSerDeser.java
deleted file mode 100644
index c1dc0bc..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/GenericSerDeser.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-import io.s4.comm.util.IOUtil;
-
-public class GenericSerDeser implements Serializer, Deserializer {
-
-    public byte[] serialize(Object obj) {
-        return IOUtil.serializeToBytes(obj);
-    }
-
-    public Object deserialize(byte[] buffer) {
-        return IOUtil.deserializeToObject(buffer);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/ListenerProcess.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/ListenerProcess.java b/s4-comm/src/main/java/io/s4/comm/core/ListenerProcess.java
deleted file mode 100644
index c00e5cc..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/ListenerProcess.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class ListenerProcess {
-    static Logger logger = Logger.getLogger(ListenerProcess.class);
-    private final String zkaddress;
-    private final String clusterName;
-    private String listenerRoot;
-    private GenericListener genericListener;
-    private Deserializer deserializer;
-    private CommEventCallback callbackHandler;
-
-    public ListenerProcess(String zkaddress, String clusterName) {
-        this.zkaddress = zkaddress;
-        this.clusterName = clusterName;
-    }
-
-    /**
-     * This will be a blocking call and will wait until it gets a task
-     * 
-     * @return listener configuration
-     */
-    public Object acquireTaskAndCreateListener(Map<String, String> map) {
-        TaskManager manager = CommServiceFactory.getTaskManager(zkaddress,
-                                                                clusterName,
-                                                                ClusterType.S4,
-                                                                callbackHandler);
-        logger.info("Waiting for task");
-        Object listenerConfig = manager.acquireTask(map);
-        createListenerFromConfig(listenerConfig);
-        return listenerConfig;
-    }
-
-    public void createListenerFromConfig(Object listenerConfig) {
-        logger.info("Starting listener with config: " + listenerConfig);
-        if (deserializer != null) {
-            genericListener = new GenericListener(zkaddress,
-                                                  clusterName,
-                                                  listenerConfig,
-                                                  deserializer);
-        } else {
-            genericListener = new GenericListener(zkaddress,
-                                                  clusterName,
-                                                  listenerConfig);
-        }
-        genericListener.start();
-
-    }
-
-    public Deserializer getDeserializer() {
-        return deserializer;
-    }
-
-    public void setDeserializer(Deserializer deserializer) {
-        this.deserializer = deserializer;
-    }
-
-    public Object listen() {
-        return genericListener.receive();
-    }
-
-    public CommEventCallback getCallbackHandler() {
-        return callbackHandler;
-    }
-
-    public void setCallbackHandler(CommEventCallback callbackHandler) {
-        this.callbackHandler = callbackHandler;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/MulticastSender.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/MulticastSender.java b/s4-comm/src/main/java/io/s4/comm/core/MulticastSender.java
deleted file mode 100644
index fbed170..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/MulticastSender.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-import io.s4.comm.util.IOUtil;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.util.Map;
-
-public class MulticastSender {
-
-    Map<String, String> map;
-    private MulticastSocket ms;
-    private InetAddress inetAddress;
-    private int port;
-
-    @SuppressWarnings("unchecked")
-    public MulticastSender(Object senderConfigData) {
-        try {
-            map = (Map<String, String>) senderConfigData;
-            ms = new MulticastSocket();
-            inetAddress = InetAddress.getByName(map.get("multicast.address"));
-            ms.joinGroup(inetAddress);
-            this.port = Integer.parseInt(map.get("multicast.port"));
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * This method will send the data to receivers in a round robin fashion
-     * 
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    public boolean send(Object data) {
-        try {
-            byte[] byteBuffer = IOUtil.serializeToBytes(data);
-            DatagramPacket dp = new DatagramPacket(byteBuffer,
-                                                   byteBuffer.length,
-                                                   inetAddress,
-                                                   port);
-            ms.send(dp);
-        } catch (IOException e) {
-            // add retry
-            e.printStackTrace();
-            return false;
-        }
-        return true;
-    }
-
-    /**
-     * This will send the data to a specific channel/receiver
-     * 
-     * @param partition
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    public boolean send(int partition, Object data) {
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/ProcessMonitor.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/ProcessMonitor.java b/s4-comm/src/main/java/io/s4/comm/core/ProcessMonitor.java
deleted file mode 100644
index b73df8f..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/ProcessMonitor.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-import java.util.List;
-import java.util.Map;
-
-public interface ProcessMonitor {
-
-    // void setCallbackHandler(CommEventCallback callbackHandler);
-
-    void monitor();
-
-    List<Object> getDestinationList();
-
-    Map<Integer, Object> getDestinationMap();
-
-    int getTaskCount();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/SendMode.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/SendMode.java b/s4-comm/src/main/java/io/s4/comm/core/SendMode.java
deleted file mode 100644
index 41a59cf..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/SendMode.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-public enum SendMode {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/SenderProcess.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/SenderProcess.java b/s4-comm/src/main/java/io/s4/comm/core/SenderProcess.java
deleted file mode 100644
index 0e20715..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/SenderProcess.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.util.Map;
-
-public class SenderProcess {
-    protected final String zkaddress;
-    protected final String adapterClusterName;
-    protected final String s4ClusterName;
-    protected Serializer serializer;
-    protected CommEventCallback callbackHandler;
-
-    public SenderProcess(String zkaddress, String clusterName) {
-        this(zkaddress, clusterName, clusterName);
-    }
-
-    public SenderProcess(String zkaddress, String adapterClusterName,
-            String s4ClusterName) {
-        this.zkaddress = zkaddress;
-        this.adapterClusterName = adapterClusterName;
-        this.s4ClusterName = s4ClusterName;
-    }
-
-    public void setSerializer(Serializer serializer) {
-        this.serializer = serializer;
-    }
-
-    public CommEventCallback getCallbackHandler() {
-        return callbackHandler;
-    }
-
-    public void setCallbackHandler(CommEventCallback callbackHandler) {
-        this.callbackHandler = callbackHandler;
-    }
-
-    protected GenericSender genericSender;
-
-    /**
-     * This will be a blocking call and will wait until it gets a task
-     * 
-     * @return senderConfig object, currently its map
-     */
-
-    public Object acquireTaskAndCreateSender(Map<String, String> map) {
-        TaskManager manager = CommServiceFactory.getTaskManager(zkaddress,
-                                                                adapterClusterName,
-                                                                ClusterType.ADAPTER,
-                                                                callbackHandler);
-        if (callbackHandler != null) {
-            // manager.setCallbackHandler(callbackHandler);
-        }
-        Object senderConfig = manager.acquireTask(map);
-        createSenderFromConfig(senderConfig);
-        return senderConfig;
-    }
-
-    public void createSenderFromConfig(Object senderConfig) {
-        if (serializer != null) {
-            this.genericSender = new GenericSender(zkaddress,
-                                                   adapterClusterName,
-                                                   s4ClusterName,
-                                                   senderConfig,
-                                                   serializer);
-        } else {
-            this.genericSender = new GenericSender(zkaddress,
-                                                   adapterClusterName,
-                                                   s4ClusterName,
-                                                   senderConfig);
-        }
-        if (callbackHandler != null) {
-            this.genericSender.setCallbackHandler(callbackHandler);
-        }
-
-    }
-
-    /**
-     * This method will send the data to receivers in a round robin fashion
-     * 
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    public boolean send(Object data) {
-        return genericSender.send(data);
-    }
-
-    /**
-     * This will send the data to a specific channel/receiver/partition
-     * 
-     * @param partition
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-    public boolean sendToPartition(int partition, Object data) {
-        return genericSender.sendToPartition(partition, data);
-    }
-
-    /**
-     * compute partition using hashcode and send to appropriate partition
-     * 
-     * @param hashcode
-     * @param data
-     * @return true if data was successfully sent, false otherwise
-     */
-
-    public boolean sendUsingHashCode(int hashcode, Object data) {
-        return genericSender.sendUsingHashCode(hashcode, data);
-    }
-
-    /**
-     * Returns the number of partitions on the receiver app side TODO: Currently
-     * it returns the number of tasks on the listener side. It works for now
-     * since numofPartitions=taskCount
-     */
-
-    public int getNumOfPartitions() {
-        return genericSender.getListenerTaskCount();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/Serializer.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/Serializer.java b/s4-comm/src/main/java/io/s4/comm/core/Serializer.java
deleted file mode 100644
index a06f3f2..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/Serializer.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-public interface Serializer {
-
-    public byte[] serialize(Object obj);
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/TaskManager.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/TaskManager.java b/s4-comm/src/main/java/io/s4/comm/core/TaskManager.java
deleted file mode 100644
index 1082197..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/TaskManager.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.core;
-
-import java.util.Map;
-
-public interface TaskManager {
-
-    Object acquireTask(Map<String, String> customTaskData);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/file/StaticProcessMonitor.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/file/StaticProcessMonitor.java b/s4-comm/src/main/java/io/s4/comm/file/StaticProcessMonitor.java
deleted file mode 100644
index f877d57..0000000
--- a/s4-comm/src/main/java/io/s4/comm/file/StaticProcessMonitor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.file;
-
-import io.s4.comm.core.ProcessMonitor;
-import io.s4.comm.util.ConfigUtils;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class StaticProcessMonitor implements ProcessMonitor {
-    static Logger logger = Logger.getLogger(StaticProcessMonitor.class);
-    private List<Object> destinationList = new ArrayList<Object>();
-    private Map<Integer, Object> destinationMap = new HashMap<Integer, Object>();
-    private int taskCount;
-    private final String clusterName;
-    private final ClusterType clusterType;
-
-    public StaticProcessMonitor(String address, String clusterName,
-            ClusterType clusterType) {
-        this.clusterName = clusterName;
-        this.clusterType = clusterType;
-    }
-
-    public void monitor() {
-        readConfig();
-    }
-
-    private void readConfig() {
-        List<Map<String, String>> processList = ConfigUtils.readConfig("clusters.xml",
-                                                                             clusterName,
-                                                                             clusterType,
-                                                                             true);
-        for (Map<String, String> processMap : processList) {
-            destinationList.add(processMap);
-            String key = (String) processMap.get("partition");
-            if (key != null) {
-                destinationMap.put(Integer.parseInt(key), processMap);
-            }
-        }
-        taskCount = destinationList.size();
-        logger.info("Destination List: " + destinationList);
-        logger.info("Destination Map: " + destinationMap);
-        logger.info("TaskCount: " + taskCount);
-    }
-
-    public List<Object> getDestinationList() {
-        return destinationList;
-    }
-
-    public Map<Integer, Object> getDestinationMap() {
-        return destinationMap;
-    }
-
-    @Override
-    public int getTaskCount() {
-        return taskCount;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/file/StaticTaskManager.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/file/StaticTaskManager.java b/s4-comm/src/main/java/io/s4/comm/file/StaticTaskManager.java
deleted file mode 100644
index 3c52258..0000000
--- a/s4-comm/src/main/java/io/s4/comm/file/StaticTaskManager.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.file;
-
-import io.s4.comm.core.CommEventCallback;
-import io.s4.comm.core.CommLayerState;
-import io.s4.comm.core.TaskManager;
-import io.s4.comm.util.ConfigUtils;
-import io.s4.comm.util.SystemUtils;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.net.InetAddress;
-import java.nio.channels.FileLock;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-public class StaticTaskManager implements TaskManager {
-    static Logger logger = Logger.getLogger(StaticTaskManager.class);
-    Set<Map<String, String>> processSet = new HashSet<Map<String, String>>();
-    private final String clusterName;
-    private final ClusterType clusterType;
-
-    /**
-     * Constructor of TaskManager
-     * 
-     * @param address
-     * @param clusterName
-     */
-    public StaticTaskManager(String address, String clusterName,
-            ClusterType clusterType, CommEventCallback callbackHandler) {
-        this.clusterName = clusterName;
-        this.clusterType = clusterType;
-        // read the configuration file
-        readStaticConfig();
-        if (callbackHandler != null) {
-            Map<String, Object> eventData = new HashMap<String, Object>();
-            eventData.put("state", CommLayerState.INITIALIZED);
-            callbackHandler.handleCallback(eventData);
-        }
-    }
-
-    private void readStaticConfig() {
-        // It should be available in classpath
-        List<Map<String, String>> processList = ConfigUtils.readConfig("clusters.xml",
-                                                                             clusterName,
-                                                                             clusterType,
-                                                                             true);
-
-        processSet.addAll(processList);
-    }
-
-    /**
-     * Will clean up taskList Node and process List Node
-     */
-    public boolean cleanUp() {
-        throw new UnsupportedOperationException("cleanUp Not supported in red button mode");
-    }
-
-    /**
-     * Creates task nodes.
-     * 
-     * @param numTasks
-     * @param data
-     */
-    public void setUpTasks(int numTasks, Object[] data) {
-        throw new UnsupportedOperationException("setUpTasks Not supported in red button mode");
-    }
-
-    /**
-     * This will block the process thread from starting the task, when it is
-     * unblocked it will return the data stored in the task node. This data can
-     * be used by the This call assumes that the tasks are already set up
-     * 
-     * @return Object containing data related to the task
-     */
-    @Override
-    public Object acquireTask(Map<String, String> customTaskData) {
-        while (true) {
-            try {
-                for (Map<String, String> processConfig : processSet) {
-                    boolean processAvailable = canTakeupProcess(processConfig);
-                    logger.info("processAvailable:" + processAvailable);
-                    if (processAvailable) {
-                        boolean success = takeProcess(processConfig);
-                        logger.info("Acquire task:"
-                                + ((success) ? "Success" : "failure"));
-                        if (success) {
-                            return processConfig;
-                        }
-                    }
-                }
-                Thread.sleep(5000);
-            } catch (Exception e) {
-                logger.error("Exception in acquireTask Method:"
-                        + customTaskData, e);
-            }
-        }
-    }
-
-    private boolean takeProcess(Map<String, String> processConfig) {
-        File lockFile = null;
-        try {
-            // TODO:contruct from processConfig
-            String lockFileName = createLockFileName(processConfig);
-            lockFile = new File(lockFileName);
-            if (!lockFile.exists()) {
-                FileOutputStream fos = new FileOutputStream(lockFile);
-                FileLock fl = fos.getChannel().tryLock();
-                if (fl != null) {
-                    String message = "Task acquired by PID:"
-                            + SystemUtils.getPID() + " HOST:"
-                            + InetAddress.getLocalHost().getHostName();
-                    fos.write(message.getBytes());
-                    fos.close();
-                    logger.info(message + "  Lock File location: "
-                            + lockFile.getAbsolutePath());
-                    return true;
-                }
-            }
-        } catch (Exception e) {
-            logger.error("Exception trying to take up process:" + processConfig,
-                         e);
-        } finally {
-            if (lockFile != null) {
-                lockFile.deleteOnExit();
-            }
-        }
-        return false;
-    }
-
-    private String createLockFileName(Map<String, String> processConfig) {
-        String lockDir = System.getProperty("lock_dir");
-        String lockFileName = clusterName + processConfig.get("ID");
-        if (lockDir != null && lockDir.trim().length() > 0) {
-            File file = new File(lockDir);
-            if (!file.exists()) {
-                file.mkdirs();
-            }
-            return lockDir + "/" + lockFileName;
-        } else {
-            return lockFileName;
-        }
-    }
-
-    private boolean canTakeupProcess(Map<String, String> processConfig) {
-        String host = processConfig.get("process.host");
-        try {
-            InetAddress inetAddress = InetAddress.getByName(host);
-            logger.info("Host Name: "
-                    + InetAddress.getLocalHost().getCanonicalHostName());
-            if (!host.equals("localhost")) {
-                if (!InetAddress.getLocalHost().equals(inetAddress)) {
-                    return false;
-                }
-            }
-        } catch (Exception e) {
-            logger.error("Invalid host:" + host);
-            return false;
-        }
-        String lockFileName = createLockFileName(processConfig);
-        File lockFile = new File(lockFileName);
-        if (!lockFile.exists()) {
-            return true;
-        } else {
-            logger.info("Process taken up by another process lockFile:"
-                    + lockFileName);
-        }
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/test/ProcessMonitorTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/test/ProcessMonitorTest.java b/s4-comm/src/main/java/io/s4/comm/test/ProcessMonitorTest.java
deleted file mode 100644
index a31b70b..0000000
--- a/s4-comm/src/main/java/io/s4/comm/test/ProcessMonitorTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.test;
-
-import io.s4.comm.file.StaticProcessMonitor;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import io.s4.comm.zk.ZkTaskSetup;
-import io.s4.comm.zk.ZkTaskManager;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-public class ProcessMonitorTest {
-    public static void main(String[] args) throws Exception {
-        // testZkProcessMonitor(args);
-        testStaticProcessMonitor(args);
-        Thread.sleep(10000);
-    }
-
-    private static void testStaticProcessMonitor(String[] args) {
-        String address = null;
-        address = "localhost:2181";
-        StaticProcessMonitor monitor = new StaticProcessMonitor(address,
-                                                                "taskmanagerTest",
-                                                                ClusterType.S4);
-        monitor.monitor();
-        System.out.println(monitor.getDestinationList());
-        System.out.println(monitor.getDestinationMap());
-    }
-
-    private static void testZkProcessMonitor(String[] args) {
-        System.out.println("Hereh");
-        // "effortfell.greatamerica.corp.yahoo.com:2181"
-        String address = args[0];
-        address = "localhost:2181";
-        String processName = args[1];
-        ZkTaskSetup zkTaskSetup = new ZkTaskSetup(address,
-                                                        "/taskmanagerTest",
-                                                        ClusterType.S4);
-        zkTaskSetup.cleanUp();
-        zkTaskSetup.setUpTasks("1.0.0.", new String[] { "task0", "task1" });
-        Object obj;
-        System.out.println(processName + " Going to Wait for a task");
-        HashMap<String, String> map = new HashMap<String, String>();
-        ZkTaskManager taskManager = new ZkTaskManager(address,
-                                                      "/taskmanagerTest",
-                                                      ClusterType.S4);
-        obj = taskManager.acquireTask(map);
-        System.out.println(processName + "taking up task: " + obj);
-        File f = new File("c:/" + obj + ".file");
-        f.delete();
-        while (true) {
-            if (f.exists()) {
-                break;
-            }
-            System.out.println(processName + " processing task: " + obj);
-            try {
-                Thread.sleep(10000);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-        System.out.println("Exiting task:" + obj);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/test/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/test/TaskManagerTest.java b/s4-comm/src/main/java/io/s4/comm/test/TaskManagerTest.java
deleted file mode 100644
index 1ba32d7..0000000
--- a/s4-comm/src/main/java/io/s4/comm/test/TaskManagerTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.test;
-
-import io.s4.comm.core.TaskManager;
-import io.s4.comm.file.StaticTaskManager;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import io.s4.comm.zk.ZkTaskSetup;
-import io.s4.comm.zk.ZkTaskManager;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-public class TaskManagerTest {
-    public static void main(String[] args) throws Exception {
-        // testZkTaskManager(args);
-        testStaticTaskManager(args);
-        Thread.sleep(10000);
-    }
-
-    private static void testStaticTaskManager(String[] args) {
-        String address = null;
-        address = "localhost:2181";
-        TaskManager taskManager = new StaticTaskManager(address,
-                                                        "taskmanagerTest",
-                                                        ClusterType.S4,
-                                                        null);
-        Map<String, String> customTaskData = new HashMap<String, String>();
-        Object acquireTask = taskManager.acquireTask(customTaskData);
-        System.out.println("Acuired Task:" + acquireTask);
-
-    }
-
-    private static void testZkTaskManager(String[] args) {
-        System.out.println("Here");
-        // "effortfell.greatamerica.corp.yahoo.com:2181"
-        String address = args[0];
-        address = "localhost:2181";
-        String processName = args[1];
-        ZkTaskSetup taskSetup = new ZkTaskSetup(address,
-                                                      "/taskmanagerTest",
-                                                      ClusterType.S4);
-        taskSetup.cleanUp();
-        taskSetup.setUpTasks("1.0.0.0", new String[] { "task0", "task1" });
-        Object obj;
-        System.out.println(processName + " Going to Wait for a task");
-        HashMap<String, String> map = new HashMap<String, String>();
-        ZkTaskManager taskManager = new ZkTaskManager(address,
-                                                      "/taskmanagerTest",
-                                                      ClusterType.S4);
-        obj = taskManager.acquireTask(map);
-        System.out.println(processName + "taking up task: " + obj);
-        File f = new File("c:/" + obj + ".file");
-        f.delete();
-        while (true) {
-            if (f.exists()) {
-                break;
-            }
-            System.out.println(processName + " processing task: " + obj);
-            try {
-                Thread.sleep(10000);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-        System.out.println("Exiting task:" + obj);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/test/TestTaskSetupApp.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/test/TestTaskSetupApp.java b/s4-comm/src/main/java/io/s4/comm/test/TestTaskSetupApp.java
deleted file mode 100644
index 6ced765..0000000
--- a/s4-comm/src/main/java/io/s4/comm/test/TestTaskSetupApp.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.test;
-
-import io.s4.comm.util.CommUtil;
-import io.s4.comm.util.JSONUtil;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import io.s4.comm.zk.ZkTaskSetup;
-import io.s4.comm.zk.ZkTaskManager;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-public class TestTaskSetupApp {
-
-    public static void main(String[] args) throws Exception {
-        new TestTaskSetupApp().testTaskSetup1();
-    }
-
-    // test the case
-    public void testTaskSetup1() throws Exception {
-        String address = "effortfell.greatamerica.corp.yahoo.com:2181";
-        Watcher watcher = new Watcher() {
-
-            @Override
-            public void process(WatchedEvent event) {
-
-            }
-
-        };
-        // setup
-        ZooKeeper zk = new ZooKeeper(address, 30000, watcher);
-        String root = "/tasksetup_app_test";
-        ZkTaskSetup zkSetup = new ZkTaskSetup(address, root, ClusterType.S4);
-        Map<String, String> task1 = new HashMap<String, String>();
-        task1.put("name", "task-1");
-
-        Map<String, String> task2 = new HashMap<String, String>();
-        task2.put("name", "task-2");
-        String tasksListRoot = root + "/tasks";
-        zkSetup.cleanUp();
-        Stat exists = zk.exists(tasksListRoot, false);
-        myassert(exists == null);
-        Object[] data = new Object[] { task1, task2 };
-        zkSetup.setUpTasks(data);
-
-        // verify that tasks are created
-        exists = zk.exists(tasksListRoot, false);
-        myassert(exists != null);
-        List<String> children = zk.getChildren(tasksListRoot, false);
-        myassert(children.size() == data.length);
-        boolean[] matched = new boolean[data.length];
-        for (String child : children) {
-            System.out.println(child);
-            String childPath = tasksListRoot + "/" + child;
-            Stat sTemp = zk.exists(childPath, false);
-            byte[] tempData = zk.getData(tasksListRoot + "/" + child,
-                                         false,
-                                         sTemp);
-            Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
-            // check if it matches any of the data
-            for (int i = 0; i < data.length; i++) {
-                Map<String, Object> newData = (Map<String, Object>) data[i];
-                if (!matched[i] && CommUtil.compareMaps(newData, map)) {
-                    matched[i] = true;
-                    break;
-                }
-            }
-        }
-        for (int i = 0; i < matched.length; i++) {
-            myassert(matched[i]);
-        }
-
-        // try running again and make verify new node is not created
-        Stat oldStat = zk.exists(tasksListRoot, false);
-        System.out.println("oldStat=" + oldStat);
-        zkSetup.setUpTasks(data);
-        Stat newStat = zk.exists(tasksListRoot, false);
-        System.out.println("newstat=" + newStat);
-        myassert(oldStat.getMtime() == newStat.getMtime());
-
-        // make change to task config and try running again and verify new
-        // config is uploaded
-        oldStat = zk.exists(tasksListRoot, false);
-        System.out.println("oldStat=" + oldStat.getVersion());
-        ((Map<String, String>) data[data.length - 1]).put("name", "changedname");
-        zkSetup.setUpTasks(data);
-        newStat = zk.exists(tasksListRoot, false);
-        System.out.println("newstat=" + newStat.getVersion());
-        System.out.println();
-        myassert(oldStat.getMtime() != newStat.getMtime());
-
-        // ensure version change is working
-        zkSetup.setUpTasks("1.0.0.0", data);
-    }
-
-    private void myassert(boolean b) {
-        if (!b) {
-            throw new AssertionError();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/test/ZkQueueTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/test/ZkQueueTest.java b/s4-comm/src/main/java/io/s4/comm/test/ZkQueueTest.java
deleted file mode 100644
index ebfef29..0000000
--- a/s4-comm/src/main/java/io/s4/comm/test/ZkQueueTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.test;
-
-import io.s4.comm.zk.ZkQueue;
-
-import org.apache.zookeeper.KeeperException;
-
-public class ZkQueueTest {
-    public static void main(String args[]) {
-        ZkQueue q = new ZkQueue(args[0], "/app1");
-
-        System.out.println("Input: " + args[0]);
-        int i;
-        Integer max = new Integer(args[1]);
-
-        if (args[2].equals("p")) {
-            System.out.println("Producer");
-            for (i = 0; i < max; i++)
-                try {
-                    q.produce(new Integer(10 + i));
-                } catch (KeeperException e) {
-
-                } catch (InterruptedException e) {
-
-                }
-        } else {
-            System.out.println("Consumer");
-
-            for (i = 0; i < max || true; i++) {
-                try {
-                    Integer r = (Integer) q.consume();
-                    System.out.println("Item: " + r);
-                } catch (KeeperException e) {
-                    e.printStackTrace();
-                    i--;
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/tools/TaskSetupApp.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/tools/TaskSetupApp.java b/s4-comm/src/main/java/io/s4/comm/tools/TaskSetupApp.java
deleted file mode 100644
index 7cdde01..0000000
--- a/s4-comm/src/main/java/io/s4/comm/tools/TaskSetupApp.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.tools;
-
-import io.s4.comm.util.ConfigUtils;
-import io.s4.comm.util.ConfigParser;
-import io.s4.comm.util.ConfigParser.Cluster;
-import io.s4.comm.util.ConfigParser.Config;
-import io.s4.comm.zk.ZkTaskSetup;
-
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class will set up initial tasks on the zookeeper USAGE: java AppTask
- * [clean] setup config.xml
- * 
- * @author kishoreg
- * 
- */
-public class TaskSetupApp {
-    public static void main(String[] args) {
-        String zkAddress = "";
-        boolean clean = false;
-        boolean setup = false;
-        String setupXml = null;
-        for (int i = 0; i < args.length; i++) {
-            if (i == 0) {
-                zkAddress = args[0];
-            }
-            if (args[i].equals("clean")) {
-                clean = true;
-            } else if (args[i].equals("setup")) {
-                setup = true;
-            } else if (i == args.length - 1) {
-                setupXml = args[i];
-            }
-        }
-        if (setupXml == null || !new File(setupXml).exists()) {
-            printusage("Set up xml: " + setupXml + " does not exist");
-        }
-        if (!setup && !clean) {
-            System.err.println("Invalid usage.");
-            printusage("Must specify at least one of of clean, setup.");
-        }
-        doMain(zkAddress, clean, setup, setupXml);
-    }
-
-    private static void printusage(String message) {
-        System.err.println(message);
-        System.err.println("java TaskSetupApp <zk_address> [clean|setup] setup_config_xml");
-        System.exit(1);
-    }
-
-    private static void doMain(String zkAddress, boolean clean, boolean setup, String setupXml) {
-        ConfigParser parser = new ConfigParser();
-        Config config = parser.parse(setupXml);
-        for (Cluster cluster : config.getClusters()) {
-            processCluster(clean, zkAddress, cluster, config.getVersion());
-        }
-    }
-
-    private static void processCluster(boolean clean, String zkAddress, Cluster cluster, String version) {
-        List<Map<String,String>> clusterInfo = ConfigUtils.readConfig(cluster, cluster.getName(), cluster.getType(), false);
-        ZkTaskSetup zkSetup = new ZkTaskSetup(zkAddress, cluster.getName(), cluster.getType());
-        if (clean) {
-            zkSetup.cleanUp();
-        }
-        
-        zkSetup.setUpTasks(version, clusterInfo.toArray());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/util/CommUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/util/CommUtil.java b/s4-comm/src/main/java/io/s4/comm/util/CommUtil.java
deleted file mode 100644
index 45f2e91..0000000
--- a/s4-comm/src/main/java/io/s4/comm/util/CommUtil.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.comm.util;
-
-import java.util.Map;
-
-public class CommUtil {
-
-    public static boolean compareMaps(Map<String, Object> map1, Map<String, Object> map2) {
-        boolean equals = true;
-        if (map1.size() == map2.size()) {
-            for (String key : map1.keySet()) {
-                if (!(map2.containsKey(key) && map1.get(key)
-                                                   .equals(map2.get(key)))) {
-                    equals = false;
-                    break;
-                }
-            }
-        } else {
-            equals = false;
-        }
-        return equals;
-    }
-
-}