You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:15 UTC
[21/50] [abbrv] git commit: Rename packages in preparation for move
to Apache
Rename packages in preparation for move to Apache
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/2f93667d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/2f93667d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/2f93667d
Branch: refs/heads/dev
Commit: 2f93667d98ebbb00de75417ba4e9c289b7fc8e6f
Parents: 9adaa07
Author: Bruce Robbins <ro...@everychoose-lm.corp.yahoo.com>
Authored: Sun Nov 20 21:22:54 2011 -0800
Committer: Bruce Robbins <ro...@everychoose-lm.corp.yahoo.com>
Committed: Sun Nov 20 21:22:54 2011 -0800
----------------------------------------------------------------------
.../java/io/s4/comm/core/CommEventCallback.java | 24 -
.../main/java/io/s4/comm/core/CommLayerState.java | 5 -
.../java/io/s4/comm/core/CommServiceFactory.java | 74 ---
.../main/java/io/s4/comm/core/DefaultWatcher.java | 112 ----
.../main/java/io/s4/comm/core/Deserializer.java | 21 -
.../main/java/io/s4/comm/core/GenericListener.java | 96 ----
.../main/java/io/s4/comm/core/GenericSender.java | 189 -------
.../main/java/io/s4/comm/core/GenericSerDeser.java | 30 -
.../main/java/io/s4/comm/core/ListenerProcess.java | 91 ---
.../main/java/io/s4/comm/core/MulticastSender.java | 79 ---
.../main/java/io/s4/comm/core/ProcessMonitor.java | 33 --
.../src/main/java/io/s4/comm/core/SendMode.java | 20 -
.../main/java/io/s4/comm/core/SenderProcess.java | 135 -----
.../src/main/java/io/s4/comm/core/Serializer.java | 21 -
.../src/main/java/io/s4/comm/core/TaskManager.java | 24 -
.../java/io/s4/comm/file/StaticProcessMonitor.java | 78 ---
.../java/io/s4/comm/file/StaticTaskManager.java | 191 -------
.../java/io/s4/comm/test/ProcessMonitorTest.java | 79 ---
.../main/java/io/s4/comm/test/TaskManagerTest.java | 82 ---
.../java/io/s4/comm/test/TestTaskSetupApp.java | 122 ----
.../src/main/java/io/s4/comm/test/ZkQueueTest.java | 57 --
.../main/java/io/s4/comm/tools/TaskSetupApp.java | 86 ---
.../src/main/java/io/s4/comm/util/CommUtil.java | 38 --
.../main/java/io/s4/comm/util/ConfigParser.java | 439 ---------------
.../src/main/java/io/s4/comm/util/ConfigUtils.java | 58 --
s4-comm/src/main/java/io/s4/comm/util/IOUtil.java | 74 ---
.../src/main/java/io/s4/comm/util/JSONUtil.java | 285 ----------
.../src/main/java/io/s4/comm/util/SystemUtils.java | 39 --
.../src/main/java/io/s4/comm/zk/ThreadTest.java | 64 ---
.../main/java/io/s4/comm/zk/ZkProcessMonitor.java | 147 -----
s4-comm/src/main/java/io/s4/comm/zk/ZkQueue.java | 117 ----
.../src/main/java/io/s4/comm/zk/ZkTaskManager.java | 153 -----
.../src/main/java/io/s4/comm/zk/ZkTaskSetup.java | 282 ---------
s4-comm/src/main/java/io/s4/comm/zk/ZkUtil.java | 144 -----
.../org/apache/s4/comm/core/CommEventCallback.java | 24 +
.../org/apache/s4/comm/core/CommLayerState.java | 5 +
.../apache/s4/comm/core/CommServiceFactory.java | 74 +++
.../org/apache/s4/comm/core/DefaultWatcher.java | 112 ++++
.../java/org/apache/s4/comm/core/Deserializer.java | 21 +
.../org/apache/s4/comm/core/GenericListener.java | 96 ++++
.../org/apache/s4/comm/core/GenericSender.java | 189 +++++++
.../org/apache/s4/comm/core/GenericSerDeser.java | 30 +
.../org/apache/s4/comm/core/ListenerProcess.java | 91 +++
.../org/apache/s4/comm/core/MulticastSender.java | 79 +++
.../org/apache/s4/comm/core/ProcessMonitor.java | 33 ++
.../java/org/apache/s4/comm/core/SendMode.java | 20 +
.../org/apache/s4/comm/core/SenderProcess.java | 135 +++++
.../java/org/apache/s4/comm/core/Serializer.java | 21 +
.../java/org/apache/s4/comm/core/TaskManager.java | 24 +
.../apache/s4/comm/file/StaticProcessMonitor.java | 78 +++
.../org/apache/s4/comm/file/StaticTaskManager.java | 191 +++++++
.../apache/s4/comm/test/ProcessMonitorTest.java | 79 +++
.../org/apache/s4/comm/test/TaskManagerTest.java | 82 +++
.../org/apache/s4/comm/test/TestTaskSetupApp.java | 122 ++++
.../java/org/apache/s4/comm/test/ZkQueueTest.java | 57 ++
.../org/apache/s4/comm/tools/TaskSetupApp.java | 86 +++
.../java/org/apache/s4/comm/util/CommUtil.java | 38 ++
.../java/org/apache/s4/comm/util/ConfigParser.java | 439 +++++++++++++++
.../java/org/apache/s4/comm/util/ConfigUtils.java | 58 ++
.../main/java/org/apache/s4/comm/util/IOUtil.java | 74 +++
.../java/org/apache/s4/comm/util/JSONUtil.java | 285 ++++++++++
.../java/org/apache/s4/comm/util/SystemUtils.java | 39 ++
.../java/org/apache/s4/comm/zk/ThreadTest.java | 64 +++
.../org/apache/s4/comm/zk/ZkProcessMonitor.java | 147 +++++
.../main/java/org/apache/s4/comm/zk/ZkQueue.java | 117 ++++
.../java/org/apache/s4/comm/zk/ZkTaskManager.java | 153 +++++
.../java/org/apache/s4/comm/zk/ZkTaskSetup.java | 282 +++++++++
.../main/java/org/apache/s4/comm/zk/ZkUtil.java | 144 +++++
s4-core/src/main/java/org/apache/s4/MainApp.java | 2 +-
.../org/apache/s4/client/util/ObjectBuilder.java | 4 +-
.../dispatcher/partitioner/DefaultPartitioner.java | 4 +-
.../java/org/apache/s4/processor/PEContainer.java | 2 +-
.../src/test/java/io/s4/ft/CheckpointingTest.java | 108 ----
s4-core/src/test/java/io/s4/ft/EventGenerator.java | 63 --
s4-core/src/test/java/io/s4/ft/KeyValue.java | 33 --
s4-core/src/test/java/io/s4/ft/RecoveryTest.java | 113 ----
s4-core/src/test/java/io/s4/ft/S4App.java | 201 -------
s4-core/src/test/java/io/s4/ft/S4TestCase.java | 52 --
.../test/java/io/s4/ft/SimpleEventProducer.java | 54 --
s4-core/src/test/java/io/s4/ft/StatefulTestPE.java | 136 -----
.../test/java/io/s4/ft/TestRedisStateStorage.java | 146 -----
s4-core/src/test/java/io/s4/ft/TestUtils.java | 399 -------------
s4-core/src/test/java/io/s4/ft/adapter.properties | 2 -
s4-core/src/test/java/io/s4/ft/adapter_conf.xml | 40 --
.../src/test/java/io/s4/ft/app_adapter_conf.xml | 14 -
s4-core/src/test/java/io/s4/ft/app_conf.xml | 26 -
.../test/java/io/s4/ft/s4_core_conf_bk_backend.xml | 198 -------
.../test/java/io/s4/ft/s4_core_conf_fs_backend.xml | 196 -------
s4-core/src/test/java/io/s4/ft/wall_clock.xml | 6 -
.../java/io/s4/ft/wordcount/FTWordCountTest.java | 159 ------
.../src/test/java/io/s4/ft/wordcount/app_conf.xml | 94 ---
.../io/s4/ft/wordcount/s4_core_conf_fs_backend.xml | 196 -------
.../s4/ft/wordcount/s4_core_conf_redis_backend.xml | 198 -------
.../test/java/io/s4/ft/wordcount/wall_clock.xml | 6 -
s4-core/src/test/java/io/s4/processor/MockPE.java | 29 -
.../java/io/s4/processor/TestPrototypeWrapper.java | 31 -
s4-core/src/test/java/io/s4/wordcount/Word.java | 22 -
.../test/java/io/s4/wordcount/WordClassifier.java | 109 ----
.../src/test/java/io/s4/wordcount/WordCount.java | 43 --
.../test/java/io/s4/wordcount/WordCountTest.java | 83 ---
.../src/test/java/io/s4/wordcount/WordCounter.java | 60 --
.../test/java/io/s4/wordcount/WordSplitter.java | 61 --
s4-core/src/test/java/io/s4/wordcount/app_conf.xml | 92 ---
.../src/test/java/io/s4/wordcount/s4_core_conf.xml | 194 -------
.../src/test/java/io/s4/wordcount/wall_clock.xml | 6 -
.../java/org/apache/s4/ft/CheckpointingTest.java | 108 ++++
.../test/java/org/apache/s4/ft/EventGenerator.java | 63 ++
.../src/test/java/org/apache/s4/ft/KeyValue.java | 33 ++
.../test/java/org/apache/s4/ft/RecoveryTest.java | 113 ++++
s4-core/src/test/java/org/apache/s4/ft/S4App.java | 201 +++++++
.../src/test/java/org/apache/s4/ft/S4TestCase.java | 52 ++
.../java/org/apache/s4/ft/SimpleEventProducer.java | 54 ++
.../test/java/org/apache/s4/ft/StatefulTestPE.java | 136 +++++
.../org/apache/s4/ft/TestRedisStateStorage.java | 146 +++++
.../src/test/java/org/apache/s4/ft/TestUtils.java | 399 +++++++++++++
.../test/java/org/apache/s4/ft/adapter.properties | 2 +
.../test/java/org/apache/s4/ft/adapter_conf.xml | 40 ++
.../java/org/apache/s4/ft/app_adapter_conf.xml | 14 +
.../src/test/java/org/apache/s4/ft/app_conf.xml | 26 +
.../org/apache/s4/ft/s4_core_conf_bk_backend.xml | 198 +++++++
.../org/apache/s4/ft/s4_core_conf_fs_backend.xml | 196 +++++++
.../src/test/java/org/apache/s4/ft/wall_clock.xml | 6 +
.../apache/s4/ft/wordcount/FTWordCountTest.java | 159 ++++++
.../java/org/apache/s4/ft/wordcount/app_conf.xml | 94 +++
.../s4/ft/wordcount/s4_core_conf_fs_backend.xml | 196 +++++++
.../s4/ft/wordcount/s4_core_conf_redis_backend.xml | 198 +++++++
.../java/org/apache/s4/ft/wordcount/wall_clock.xml | 6 +
.../test/java/org/apache/s4/processor/MockPE.java | 29 +
.../apache/s4/processor/TestPrototypeWrapper.java | 31 +
.../test/java/org/apache/s4/wordcount/Word.java | 22 +
.../org/apache/s4/wordcount/WordClassifier.java | 109 ++++
.../java/org/apache/s4/wordcount/WordCount.java | 43 ++
.../org/apache/s4/wordcount/WordCountTest.java | 83 +++
.../java/org/apache/s4/wordcount/WordCounter.java | 60 ++
.../java/org/apache/s4/wordcount/WordSplitter.java | 61 ++
.../test/java/org/apache/s4/wordcount/app_conf.xml | 92 +++
.../java/org/apache/s4/wordcount/s4_core_conf.xml | 194 +++++++
.../java/org/apache/s4/wordcount/wall_clock.xml | 6 +
138 files changed, 6665 insertions(+), 6665 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/CommEventCallback.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/CommEventCallback.java b/s4-comm/src/main/java/io/s4/comm/core/CommEventCallback.java
deleted file mode 100644
index a7a50f7..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/CommEventCallback.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-import java.util.Map;
-
-public interface CommEventCallback {
-
- public void handleCallback(Map<String, Object> eventData);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/CommLayerState.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/CommLayerState.java b/s4-comm/src/main/java/io/s4/comm/core/CommLayerState.java
deleted file mode 100644
index add6284..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/CommLayerState.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package io.s4.comm.core;
-
-public enum CommLayerState {
- INITIALIZED, BROKEN
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/CommServiceFactory.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/CommServiceFactory.java b/s4-comm/src/main/java/io/s4/comm/core/CommServiceFactory.java
deleted file mode 100644
index 41900e3..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/CommServiceFactory.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-import io.s4.comm.file.StaticProcessMonitor;
-import io.s4.comm.file.StaticTaskManager;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import io.s4.comm.zk.ZkProcessMonitor;
-import io.s4.comm.zk.ZkTaskManager;
-
-import org.apache.log4j.Logger;
-
-/**
- * Common Factory class to provide appropriate implementations
- *
- * @author kishoreg
- *
- */
-public class CommServiceFactory {
- private static Logger logger = Logger.getLogger(CommServiceFactory.class);
-
- public static TaskManager getTaskManager(String zkaddress,
- String clusterName, ClusterType clusterType,
- CommEventCallback callbackHandler) {
- String mode = System.getProperty("commlayer.mode");
- TaskManager taskManager = null;
- if (mode != null && mode.equalsIgnoreCase("static")) {
- logger.info("Comm layer mode is set to static");
- taskManager = new StaticTaskManager(zkaddress,
- clusterName,
- clusterType,
- callbackHandler);
- } else {
- taskManager = new ZkTaskManager(zkaddress,
- clusterName,
- clusterType,
- callbackHandler);
- }
-
- return taskManager;
- }
-
- public static ProcessMonitor getProcessMonitor(String zkaddress,
- String clusterName, CommEventCallback callbackHandler) {
- ProcessMonitor processMonitor = null;
- String mode = System.getProperty("commlayer.mode");
- if (mode != null && mode.equalsIgnoreCase("static")) {
- logger.info("Comm layer mode is set to static");
- processMonitor = new StaticProcessMonitor(zkaddress,
- clusterName,
- ClusterType.S4);
- } else {
- processMonitor = new ZkProcessMonitor(zkaddress,
- clusterName,
- ClusterType.S4,
- callbackHandler);
- }
- return processMonitor;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/DefaultWatcher.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/DefaultWatcher.java b/s4-comm/src/main/java/io/s4/comm/core/DefaultWatcher.java
deleted file mode 100644
index 93c11a4..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/DefaultWatcher.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-public class DefaultWatcher implements Watcher {
-
- public static List<KeeperState> interestingStates = new ArrayList<KeeperState>();
- static {
- interestingStates.add(KeeperState.Expired);
- interestingStates.add(KeeperState.SyncConnected);
- }
- public static Logger logger = Logger.getLogger(DefaultWatcher.class);
- protected ZooKeeper zk = null;
- protected Integer mutex;
- protected String root;
- protected WatchedEvent currentEvent;
- protected CommEventCallback callbackHandler;
- private String zkAddress;
- volatile boolean connected = false;
-
- protected DefaultWatcher(String address) {
- this(address, null);
- }
-
- protected DefaultWatcher(String address, CommEventCallback callbackHandler) {
- this.zkAddress = address;
- this.callbackHandler = callbackHandler;
- if (zk == null) {
- try {
- logger.info("Connecting to zookeeper server:" + address);
- String sTimeout = System.getProperty("zk.session.timeout");
- System.out.println("sTimeout=" + sTimeout);
- int timeout = 30000;
- if (sTimeout != null) {
- try {
- timeout = Integer.parseInt(sTimeout);
- } catch (Exception e) {
- // ignore will use default
- }
- }
- mutex = new Integer(-1);
- synchronized (mutex) {
- zk = new ZooKeeper(address, timeout, this);
- while (!connected) {
- logger.info("Waiting for connection to be established ");
- mutex.wait();
- }
- }
- logger.info("Connected to zookeeper with sessionid: "
- + zk.getSessionId() + " and session timeout(ms): "
- + timeout);
-
- } catch (Exception e) {
- logger.error("Failed to connect to zookeeper:" + e.getMessage(),
- e);
- zk = null;
- throw new RuntimeException(e);
- }
- }
- }
-
- synchronized public void process(WatchedEvent event) {
- logger.info("Received zk event:" + event);
- synchronized (mutex) {
- currentEvent = event;
- if (event.getState() == KeeperState.SyncConnected) {
- connected = true;
- }
- if (callbackHandler != null
- && interestingStates.contains(event.getState())) {
- Map<String, Object> eventData = new HashMap<String, Object>();
- if (event.getState() == KeeperState.SyncConnected) {
- eventData.put("state", CommLayerState.INITIALIZED);
- } else if (event.getState() == KeeperState.Expired) {
- eventData.put("state", CommLayerState.BROKEN);
- }
- eventData.put("source", event);
- callbackHandler.handleCallback(eventData);
- }
- mutex.notify();
- }
- }
-
- public String getZkAddress() {
- return zkAddress;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/Deserializer.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/Deserializer.java b/s4-comm/src/main/java/io/s4/comm/core/Deserializer.java
deleted file mode 100644
index 10f335b..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/Deserializer.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-public interface Deserializer {
-
- public Object deserialize(byte[] buffer);
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/GenericListener.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/GenericListener.java b/s4-comm/src/main/java/io/s4/comm/core/GenericListener.java
deleted file mode 100644
index 79b7ba8..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/GenericListener.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class GenericListener {
- private static Logger logger = Logger.getLogger(GenericListener.class);
- private String zkAddress;
- private DatagramSocket socket;
- int BUFFER_LENGTH = 65507;
- private DatagramPacket dgram;
- private byte[] bs;
- final Deserializer deserializer;
-
- public GenericListener(String zkaddress, String appName,
- Object listenerConfig) {
- this(zkaddress, appName, listenerConfig, new GenericSerDeser());
- }
-
- public GenericListener(String zkaddress, String appName,
- Object listenerConfig, Deserializer deserializer) {
- this.zkAddress = zkAddress;
- this.deserializer = deserializer;
- try {
- Map<String, String> map = (Map<String, String>) listenerConfig;
- String mode = map.get("mode");
- int port = Integer.parseInt(map.get("port"));
- if (mode.equals("multicast")) {
- InetAddress inetAddress = InetAddress.getByName(map.get("channel"));
- socket = new MulticastSocket(port);
- ((MulticastSocket) socket).joinGroup(inetAddress);
- }
- if (mode.equals("unicast")) {
- socket = new DatagramSocket(port);
- }
- String udpBufferSize = System.getProperty("udp.buffer.size");
- if (udpBufferSize == null) {
- udpBufferSize = "4194302";
- }
- socket.setReceiveBufferSize(Integer.parseInt(udpBufferSize));
- bs = new byte[BUFFER_LENGTH];
- dgram = new DatagramPacket(bs, bs.length);
- } catch (IOException e) {
- logger.error("error creating listener", e);
- throw new RuntimeException(e);
- }
-
- }
-
- public Object receive() {
- try {
- socket.receive(dgram);
- byte[] data = new byte[dgram.getLength()];
- System.arraycopy(dgram.getData(),
- dgram.getOffset(),
- data,
- 0,
- data.length);
- Object object = deserializer.deserialize(data);
- dgram.setLength(BUFFER_LENGTH);
- return object;
- } catch (IOException e) {
- logger.error("error receiving message", e);
- throw new RuntimeException(e);
- }
- }
-
- /*
- * There is nothing much to do for multicast and unicast
- */
- public void start() {
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/GenericSender.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/GenericSender.java b/s4-comm/src/main/java/io/s4/comm/core/GenericSender.java
deleted file mode 100644
index 1d223d1..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/GenericSender.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.util.List;
-import java.util.Map;
-import org.apache.log4j.Logger;
-
-public class GenericSender {
- static Logger logger = Logger.getLogger(GenericSender.class);
- Map<String, String> map;
- private DatagramSocket socket;
- private final String zkAddress;
- ProcessMonitor listenerMonitor;
- int rotationCounter = 0;
- private final Serializer serializer;
- private final int listenerTaskCount;
- private CommEventCallback callbackHandler;
- private String mode;
-
- public GenericSender(String zkAddress, String appName,
- Object senderConfigData) {
- this(zkAddress, appName, appName, senderConfigData);
- }
-
- @SuppressWarnings("unchecked")
- public GenericSender(String zkAddress, String adapterClusterName,
- String s4ClusterName, Object senderConfigData, Serializer serializer) {
- this.zkAddress = zkAddress;
- this.serializer = serializer;
- try {
- map = (Map<String, String>) senderConfigData;
- mode = map.get("mode");
- if (mode.equals("multicast")) {
- socket = new MulticastSocket();
- }
- if (mode.equals("unicast")) {
- socket = new DatagramSocket();
- }
- listenerMonitor = CommServiceFactory.getProcessMonitor(this.zkAddress,
- s4ClusterName,
- callbackHandler);
- if (callbackHandler != null) {
- // listenerMonitor.setCallbackHandler(callbackHandler);
- }
- listenerMonitor.monitor();
- this.listenerTaskCount = listenerMonitor.getTaskCount();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public GenericSender(String zkAddress, String senderAppName,
- String listenerAppName, Object senderConfigData) {
- this(zkAddress,
- senderAppName,
- listenerAppName,
- senderConfigData,
- new GenericSerDeser());
- }
-
- /**
- * This method will send the data to receivers in a round robin fashion
- *
- * @param data
- * @return true if data was successfully sent, false otherwise
- */
- @SuppressWarnings("unchecked")
- public boolean send(Object data) {
- try {
- List<Object> destinationList = listenerMonitor.getDestinationList();
- if (destinationList == null || destinationList.size() == 0) {
- logger.error("Failed to send message: No destination available"
- + data);
- return false;
- }
- byte[] byteBuffer = serializer.serialize(data);
- rotationCounter = rotationCounter + 1;
-
- int index = rotationCounter % destinationList.size();
- Map<String, String> dest = (Map<String, String>) destinationList.get(Math.abs(index));
- InetAddress inetAddress;
- int port;
- if (mode.equals("unicast")) {
- inetAddress = InetAddress.getByName(dest.get("address"));
- port = Integer.parseInt((dest.get("port")));
- } else if (mode.equals("multicast")) {
- inetAddress = InetAddress.getByName(dest.get("channel"));
- port = Integer.parseInt((dest.get("port")));
- } else {
- logger.error("Failed to send message unknown mode: " + mode);
- return false;
- }
- DatagramPacket dp = new DatagramPacket(byteBuffer,
- byteBuffer.length,
- inetAddress,
- port);
- socket.send(dp);
- } catch (IOException e) {
- // add retry
- logger.error("Failed to send message: " + data, e);
- return false;
- }
- return true;
- }
-
- /**
- * This will send the data to a specific channel/receiver/partition
- *
- * @param partition
- * @param data
- * @return true if data was successfully sent, false otherwise
- */
- @SuppressWarnings("unchecked")
- public boolean sendToPartition(int partition, Object data) {
- try {
- byte[] byteBuffer = serializer.serialize(data);
- Map<Integer, Object> destinationMap = listenerMonitor.getDestinationMap();
- if (logger.isDebugEnabled()) {
- logger.debug("Destination Map:" + destinationMap);
- }
- Map<String, String> dest = (Map<String, String>) destinationMap.get(partition);
- if (dest != null) {
- InetAddress inetAddress = InetAddress.getByName(dest.get("address"));
- int port = Integer.parseInt((dest.get("port")));
- DatagramPacket dp = new DatagramPacket(byteBuffer,
- byteBuffer.length,
- inetAddress,
- port);
- socket.send(dp);
- } else {
- logger.warn("Destination not available for partition:"
- + partition + " Skipping message:" + data);
- return false;
- }
- } catch (IOException e) {
- // add retry
- logger.error("Failed to send message: " + data, e);
- return false;
- }
-
- return true;
-
- }
-
- /**
- * compute partition using hashcode and send to appropriate partition
- *
- * @param hashcode
- * @param data
- * @return true if data was successfully sent, false otherwise
- */
- public boolean sendUsingHashCode(int hashcode, Object data) {
- int partition = (hashcode & Integer.MAX_VALUE) % listenerTaskCount;
- return sendToPartition(partition, data);
-
- }
-
- public CommEventCallback getCallbackHandler() {
- return callbackHandler;
- }
-
- public void setCallbackHandler(CommEventCallback callbackHandler) {
- this.callbackHandler = callbackHandler;
- }
-
- public int getListenerTaskCount() {
- return listenerTaskCount;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/GenericSerDeser.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/GenericSerDeser.java b/s4-comm/src/main/java/io/s4/comm/core/GenericSerDeser.java
deleted file mode 100644
index c1dc0bc..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/GenericSerDeser.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-import io.s4.comm.util.IOUtil;
-
-public class GenericSerDeser implements Serializer, Deserializer {
-
- public byte[] serialize(Object obj) {
- return IOUtil.serializeToBytes(obj);
- }
-
- public Object deserialize(byte[] buffer) {
- return IOUtil.deserializeToObject(buffer);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/ListenerProcess.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/ListenerProcess.java b/s4-comm/src/main/java/io/s4/comm/core/ListenerProcess.java
deleted file mode 100644
index c00e5cc..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/ListenerProcess.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class ListenerProcess {
- static Logger logger = Logger.getLogger(ListenerProcess.class);
- private final String zkaddress;
- private final String clusterName;
- private String listenerRoot;
- private GenericListener genericListener;
- private Deserializer deserializer;
- private CommEventCallback callbackHandler;
-
- public ListenerProcess(String zkaddress, String clusterName) {
- this.zkaddress = zkaddress;
- this.clusterName = clusterName;
- }
-
- /**
- * This will be a blocking call and will wait until it gets a task
- *
- * @return listener configuration
- */
- public Object acquireTaskAndCreateListener(Map<String, String> map) {
- TaskManager manager = CommServiceFactory.getTaskManager(zkaddress,
- clusterName,
- ClusterType.S4,
- callbackHandler);
- logger.info("Waiting for task");
- Object listenerConfig = manager.acquireTask(map);
- createListenerFromConfig(listenerConfig);
- return listenerConfig;
- }
-
- public void createListenerFromConfig(Object listenerConfig) {
- logger.info("Starting listener with config: " + listenerConfig);
- if (deserializer != null) {
- genericListener = new GenericListener(zkaddress,
- clusterName,
- listenerConfig,
- deserializer);
- } else {
- genericListener = new GenericListener(zkaddress,
- clusterName,
- listenerConfig);
- }
- genericListener.start();
-
- }
-
- public Deserializer getDeserializer() {
- return deserializer;
- }
-
- public void setDeserializer(Deserializer deserializer) {
- this.deserializer = deserializer;
- }
-
- public Object listen() {
- return genericListener.receive();
- }
-
- public CommEventCallback getCallbackHandler() {
- return callbackHandler;
- }
-
- public void setCallbackHandler(CommEventCallback callbackHandler) {
- this.callbackHandler = callbackHandler;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/MulticastSender.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/MulticastSender.java b/s4-comm/src/main/java/io/s4/comm/core/MulticastSender.java
deleted file mode 100644
index fbed170..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/MulticastSender.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-import io.s4.comm.util.IOUtil;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.util.Map;
-
-public class MulticastSender {
-
- Map<String, String> map;
- private MulticastSocket ms;
- private InetAddress inetAddress;
- private int port;
-
- @SuppressWarnings("unchecked")
- public MulticastSender(Object senderConfigData) {
- try {
- map = (Map<String, String>) senderConfigData;
- ms = new MulticastSocket();
- inetAddress = InetAddress.getByName(map.get("multicast.address"));
- ms.joinGroup(inetAddress);
- this.port = Integer.parseInt(map.get("multicast.port"));
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * This method will send the data to receivers in a round robin fashion
- *
- * @param data
- * @return true if data was successfully sent, false otherwise
- */
- public boolean send(Object data) {
- try {
- byte[] byteBuffer = IOUtil.serializeToBytes(data);
- DatagramPacket dp = new DatagramPacket(byteBuffer,
- byteBuffer.length,
- inetAddress,
- port);
- ms.send(dp);
- } catch (IOException e) {
- // add retry
- e.printStackTrace();
- return false;
- }
- return true;
- }
-
- /**
- * This will send the data to a specific channel/receiver
- *
- * @param partition
- * @param data
- * @return true if data was successfully sent, false otherwise
- */
- public boolean send(int partition, Object data) {
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/ProcessMonitor.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/ProcessMonitor.java b/s4-comm/src/main/java/io/s4/comm/core/ProcessMonitor.java
deleted file mode 100644
index b73df8f..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/ProcessMonitor.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-import java.util.List;
-import java.util.Map;
-
-public interface ProcessMonitor {
-
- // void setCallbackHandler(CommEventCallback callbackHandler);
-
- void monitor();
-
- List<Object> getDestinationList();
-
- Map<Integer, Object> getDestinationMap();
-
- int getTaskCount();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/SendMode.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/SendMode.java b/s4-comm/src/main/java/io/s4/comm/core/SendMode.java
deleted file mode 100644
index 41a59cf..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/SendMode.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-public enum SendMode {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/SenderProcess.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/SenderProcess.java b/s4-comm/src/main/java/io/s4/comm/core/SenderProcess.java
deleted file mode 100644
index 0e20715..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/SenderProcess.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.util.Map;
-
-public class SenderProcess {
- protected final String zkaddress;
- protected final String adapterClusterName;
- protected final String s4ClusterName;
- protected Serializer serializer;
- protected CommEventCallback callbackHandler;
-
- public SenderProcess(String zkaddress, String clusterName) {
- this(zkaddress, clusterName, clusterName);
- }
-
- public SenderProcess(String zkaddress, String adapterClusterName,
- String s4ClusterName) {
- this.zkaddress = zkaddress;
- this.adapterClusterName = adapterClusterName;
- this.s4ClusterName = s4ClusterName;
- }
-
- public void setSerializer(Serializer serializer) {
- this.serializer = serializer;
- }
-
- public CommEventCallback getCallbackHandler() {
- return callbackHandler;
- }
-
- public void setCallbackHandler(CommEventCallback callbackHandler) {
- this.callbackHandler = callbackHandler;
- }
-
- protected GenericSender genericSender;
-
- /**
- * This will be a blocking call and will wait until it gets a task
- *
- * @return senderConfig object, currently its map
- */
-
- public Object acquireTaskAndCreateSender(Map<String, String> map) {
- TaskManager manager = CommServiceFactory.getTaskManager(zkaddress,
- adapterClusterName,
- ClusterType.ADAPTER,
- callbackHandler);
- if (callbackHandler != null) {
- // manager.setCallbackHandler(callbackHandler);
- }
- Object senderConfig = manager.acquireTask(map);
- createSenderFromConfig(senderConfig);
- return senderConfig;
- }
-
- public void createSenderFromConfig(Object senderConfig) {
- if (serializer != null) {
- this.genericSender = new GenericSender(zkaddress,
- adapterClusterName,
- s4ClusterName,
- senderConfig,
- serializer);
- } else {
- this.genericSender = new GenericSender(zkaddress,
- adapterClusterName,
- s4ClusterName,
- senderConfig);
- }
- if (callbackHandler != null) {
- this.genericSender.setCallbackHandler(callbackHandler);
- }
-
- }
-
- /**
- * This method will send the data to receivers in a round robin fashion
- *
- * @param data
- * @return true if data was successfully sent, false otherwise
- */
- public boolean send(Object data) {
- return genericSender.send(data);
- }
-
- /**
- * This will send the data to a specific channel/receiver/partition
- *
- * @param partition
- * @param data
- * @return true if data was successfully sent, false otherwise
- */
- public boolean sendToPartition(int partition, Object data) {
- return genericSender.sendToPartition(partition, data);
- }
-
- /**
- * compute partition using hashcode and send to appropriate partition
- *
- * @param hashcode
- * @param data
- * @return true if data was successfully sent, false otherwise
- */
-
- public boolean sendUsingHashCode(int hashcode, Object data) {
- return genericSender.sendUsingHashCode(hashcode, data);
- }
-
- /**
- * Returns the number of partitions on the receiver app side TODO: Currently
- * it returns the number of tasks on the listener side. It works for now
- * since numofPartitions=taskCount
- */
-
- public int getNumOfPartitions() {
- return genericSender.getListenerTaskCount();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/Serializer.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/Serializer.java b/s4-comm/src/main/java/io/s4/comm/core/Serializer.java
deleted file mode 100644
index a06f3f2..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/Serializer.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-public interface Serializer {
-
- public byte[] serialize(Object obj);
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/core/TaskManager.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/core/TaskManager.java b/s4-comm/src/main/java/io/s4/comm/core/TaskManager.java
deleted file mode 100644
index 1082197..0000000
--- a/s4-comm/src/main/java/io/s4/comm/core/TaskManager.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.core;
-
-import java.util.Map;
-
-public interface TaskManager {
-
- Object acquireTask(Map<String, String> customTaskData);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/file/StaticProcessMonitor.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/file/StaticProcessMonitor.java b/s4-comm/src/main/java/io/s4/comm/file/StaticProcessMonitor.java
deleted file mode 100644
index f877d57..0000000
--- a/s4-comm/src/main/java/io/s4/comm/file/StaticProcessMonitor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.file;
-
-import io.s4.comm.core.ProcessMonitor;
-import io.s4.comm.util.ConfigUtils;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class StaticProcessMonitor implements ProcessMonitor {
- static Logger logger = Logger.getLogger(StaticProcessMonitor.class);
- private List<Object> destinationList = new ArrayList<Object>();
- private Map<Integer, Object> destinationMap = new HashMap<Integer, Object>();
- private int taskCount;
- private final String clusterName;
- private final ClusterType clusterType;
-
- public StaticProcessMonitor(String address, String clusterName,
- ClusterType clusterType) {
- this.clusterName = clusterName;
- this.clusterType = clusterType;
- }
-
- public void monitor() {
- readConfig();
- }
-
- private void readConfig() {
- List<Map<String, String>> processList = ConfigUtils.readConfig("clusters.xml",
- clusterName,
- clusterType,
- true);
- for (Map<String, String> processMap : processList) {
- destinationList.add(processMap);
- String key = (String) processMap.get("partition");
- if (key != null) {
- destinationMap.put(Integer.parseInt(key), processMap);
- }
- }
- taskCount = destinationList.size();
- logger.info("Destination List: " + destinationList);
- logger.info("Destination Map: " + destinationMap);
- logger.info("TaskCount: " + taskCount);
- }
-
- public List<Object> getDestinationList() {
- return destinationList;
- }
-
- public Map<Integer, Object> getDestinationMap() {
- return destinationMap;
- }
-
- @Override
- public int getTaskCount() {
- return taskCount;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/file/StaticTaskManager.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/file/StaticTaskManager.java b/s4-comm/src/main/java/io/s4/comm/file/StaticTaskManager.java
deleted file mode 100644
index 3c52258..0000000
--- a/s4-comm/src/main/java/io/s4/comm/file/StaticTaskManager.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.file;
-
-import io.s4.comm.core.CommEventCallback;
-import io.s4.comm.core.CommLayerState;
-import io.s4.comm.core.TaskManager;
-import io.s4.comm.util.ConfigUtils;
-import io.s4.comm.util.SystemUtils;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.net.InetAddress;
-import java.nio.channels.FileLock;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-public class StaticTaskManager implements TaskManager {
- static Logger logger = Logger.getLogger(StaticTaskManager.class);
- Set<Map<String, String>> processSet = new HashSet<Map<String, String>>();
- private final String clusterName;
- private final ClusterType clusterType;
-
- /**
- * Constructor of TaskManager
- *
- * @param address
- * @param clusterName
- */
- public StaticTaskManager(String address, String clusterName,
- ClusterType clusterType, CommEventCallback callbackHandler) {
- this.clusterName = clusterName;
- this.clusterType = clusterType;
- // read the configuration file
- readStaticConfig();
- if (callbackHandler != null) {
- Map<String, Object> eventData = new HashMap<String, Object>();
- eventData.put("state", CommLayerState.INITIALIZED);
- callbackHandler.handleCallback(eventData);
- }
- }
-
- private void readStaticConfig() {
- // It should be available in classpath
- List<Map<String, String>> processList = ConfigUtils.readConfig("clusters.xml",
- clusterName,
- clusterType,
- true);
-
- processSet.addAll(processList);
- }
-
- /**
- * Will clean up taskList Node and process List Node
- */
- public boolean cleanUp() {
- throw new UnsupportedOperationException("cleanUp Not supported in red button mode");
- }
-
- /**
- * Creates task nodes.
- *
- * @param numTasks
- * @param data
- */
- public void setUpTasks(int numTasks, Object[] data) {
- throw new UnsupportedOperationException("setUpTasks Not supported in red button mode");
- }
-
- /**
- * This will block the process thread from starting the task, when it is
- * unblocked it will return the data stored in the task node. This data can
- * be used by the This call assumes that the tasks are already set up
- *
- * @return Object containing data related to the task
- */
- @Override
- public Object acquireTask(Map<String, String> customTaskData) {
- while (true) {
- try {
- for (Map<String, String> processConfig : processSet) {
- boolean processAvailable = canTakeupProcess(processConfig);
- logger.info("processAvailable:" + processAvailable);
- if (processAvailable) {
- boolean success = takeProcess(processConfig);
- logger.info("Acquire task:"
- + ((success) ? "Success" : "failure"));
- if (success) {
- return processConfig;
- }
- }
- }
- Thread.sleep(5000);
- } catch (Exception e) {
- logger.error("Exception in acquireTask Method:"
- + customTaskData, e);
- }
- }
- }
-
- private boolean takeProcess(Map<String, String> processConfig) {
- File lockFile = null;
- try {
- // TODO:contruct from processConfig
- String lockFileName = createLockFileName(processConfig);
- lockFile = new File(lockFileName);
- if (!lockFile.exists()) {
- FileOutputStream fos = new FileOutputStream(lockFile);
- FileLock fl = fos.getChannel().tryLock();
- if (fl != null) {
- String message = "Task acquired by PID:"
- + SystemUtils.getPID() + " HOST:"
- + InetAddress.getLocalHost().getHostName();
- fos.write(message.getBytes());
- fos.close();
- logger.info(message + " Lock File location: "
- + lockFile.getAbsolutePath());
- return true;
- }
- }
- } catch (Exception e) {
- logger.error("Exception trying to take up process:" + processConfig,
- e);
- } finally {
- if (lockFile != null) {
- lockFile.deleteOnExit();
- }
- }
- return false;
- }
-
- private String createLockFileName(Map<String, String> processConfig) {
- String lockDir = System.getProperty("lock_dir");
- String lockFileName = clusterName + processConfig.get("ID");
- if (lockDir != null && lockDir.trim().length() > 0) {
- File file = new File(lockDir);
- if (!file.exists()) {
- file.mkdirs();
- }
- return lockDir + "/" + lockFileName;
- } else {
- return lockFileName;
- }
- }
-
- private boolean canTakeupProcess(Map<String, String> processConfig) {
- String host = processConfig.get("process.host");
- try {
- InetAddress inetAddress = InetAddress.getByName(host);
- logger.info("Host Name: "
- + InetAddress.getLocalHost().getCanonicalHostName());
- if (!host.equals("localhost")) {
- if (!InetAddress.getLocalHost().equals(inetAddress)) {
- return false;
- }
- }
- } catch (Exception e) {
- logger.error("Invalid host:" + host);
- return false;
- }
- String lockFileName = createLockFileName(processConfig);
- File lockFile = new File(lockFileName);
- if (!lockFile.exists()) {
- return true;
- } else {
- logger.info("Process taken up by another process lockFile:"
- + lockFileName);
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/test/ProcessMonitorTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/test/ProcessMonitorTest.java b/s4-comm/src/main/java/io/s4/comm/test/ProcessMonitorTest.java
deleted file mode 100644
index a31b70b..0000000
--- a/s4-comm/src/main/java/io/s4/comm/test/ProcessMonitorTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.test;
-
-import io.s4.comm.file.StaticProcessMonitor;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import io.s4.comm.zk.ZkTaskSetup;
-import io.s4.comm.zk.ZkTaskManager;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-public class ProcessMonitorTest {
- public static void main(String[] args) throws Exception {
- // testZkProcessMonitor(args);
- testStaticProcessMonitor(args);
- Thread.sleep(10000);
- }
-
- private static void testStaticProcessMonitor(String[] args) {
- String address = null;
- address = "localhost:2181";
- StaticProcessMonitor monitor = new StaticProcessMonitor(address,
- "taskmanagerTest",
- ClusterType.S4);
- monitor.monitor();
- System.out.println(monitor.getDestinationList());
- System.out.println(monitor.getDestinationMap());
- }
-
- private static void testZkProcessMonitor(String[] args) {
- System.out.println("Hereh");
- // "effortfell.greatamerica.corp.yahoo.com:2181"
- String address = args[0];
- address = "localhost:2181";
- String processName = args[1];
- ZkTaskSetup zkTaskSetup = new ZkTaskSetup(address,
- "/taskmanagerTest",
- ClusterType.S4);
- zkTaskSetup.cleanUp();
- zkTaskSetup.setUpTasks("1.0.0.", new String[] { "task0", "task1" });
- Object obj;
- System.out.println(processName + " Going to Wait for a task");
- HashMap<String, String> map = new HashMap<String, String>();
- ZkTaskManager taskManager = new ZkTaskManager(address,
- "/taskmanagerTest",
- ClusterType.S4);
- obj = taskManager.acquireTask(map);
- System.out.println(processName + "taking up task: " + obj);
- File f = new File("c:/" + obj + ".file");
- f.delete();
- while (true) {
- if (f.exists()) {
- break;
- }
- System.out.println(processName + " processing task: " + obj);
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- System.out.println("Exiting task:" + obj);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/test/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/test/TaskManagerTest.java b/s4-comm/src/main/java/io/s4/comm/test/TaskManagerTest.java
deleted file mode 100644
index 1ba32d7..0000000
--- a/s4-comm/src/main/java/io/s4/comm/test/TaskManagerTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.test;
-
-import io.s4.comm.core.TaskManager;
-import io.s4.comm.file.StaticTaskManager;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import io.s4.comm.zk.ZkTaskSetup;
-import io.s4.comm.zk.ZkTaskManager;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-public class TaskManagerTest {
- public static void main(String[] args) throws Exception {
- // testZkTaskManager(args);
- testStaticTaskManager(args);
- Thread.sleep(10000);
- }
-
- private static void testStaticTaskManager(String[] args) {
- String address = null;
- address = "localhost:2181";
- TaskManager taskManager = new StaticTaskManager(address,
- "taskmanagerTest",
- ClusterType.S4,
- null);
- Map<String, String> customTaskData = new HashMap<String, String>();
- Object acquireTask = taskManager.acquireTask(customTaskData);
- System.out.println("Acuired Task:" + acquireTask);
-
- }
-
- private static void testZkTaskManager(String[] args) {
- System.out.println("Here");
- // "effortfell.greatamerica.corp.yahoo.com:2181"
- String address = args[0];
- address = "localhost:2181";
- String processName = args[1];
- ZkTaskSetup taskSetup = new ZkTaskSetup(address,
- "/taskmanagerTest",
- ClusterType.S4);
- taskSetup.cleanUp();
- taskSetup.setUpTasks("1.0.0.0", new String[] { "task0", "task1" });
- Object obj;
- System.out.println(processName + " Going to Wait for a task");
- HashMap<String, String> map = new HashMap<String, String>();
- ZkTaskManager taskManager = new ZkTaskManager(address,
- "/taskmanagerTest",
- ClusterType.S4);
- obj = taskManager.acquireTask(map);
- System.out.println(processName + "taking up task: " + obj);
- File f = new File("c:/" + obj + ".file");
- f.delete();
- while (true) {
- if (f.exists()) {
- break;
- }
- System.out.println(processName + " processing task: " + obj);
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- System.out.println("Exiting task:" + obj);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/test/TestTaskSetupApp.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/test/TestTaskSetupApp.java b/s4-comm/src/main/java/io/s4/comm/test/TestTaskSetupApp.java
deleted file mode 100644
index 6ced765..0000000
--- a/s4-comm/src/main/java/io/s4/comm/test/TestTaskSetupApp.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.test;
-
-import io.s4.comm.util.CommUtil;
-import io.s4.comm.util.JSONUtil;
-import io.s4.comm.util.ConfigParser.Cluster.ClusterType;
-import io.s4.comm.zk.ZkTaskSetup;
-import io.s4.comm.zk.ZkTaskManager;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-public class TestTaskSetupApp {
-
- public static void main(String[] args) throws Exception {
- new TestTaskSetupApp().testTaskSetup1();
- }
-
- // test the case
- public void testTaskSetup1() throws Exception {
- String address = "effortfell.greatamerica.corp.yahoo.com:2181";
- Watcher watcher = new Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
-
- }
-
- };
- // setup
- ZooKeeper zk = new ZooKeeper(address, 30000, watcher);
- String root = "/tasksetup_app_test";
- ZkTaskSetup zkSetup = new ZkTaskSetup(address, root, ClusterType.S4);
- Map<String, String> task1 = new HashMap<String, String>();
- task1.put("name", "task-1");
-
- Map<String, String> task2 = new HashMap<String, String>();
- task2.put("name", "task-2");
- String tasksListRoot = root + "/tasks";
- zkSetup.cleanUp();
- Stat exists = zk.exists(tasksListRoot, false);
- myassert(exists == null);
- Object[] data = new Object[] { task1, task2 };
- zkSetup.setUpTasks(data);
-
- // verify that tasks are created
- exists = zk.exists(tasksListRoot, false);
- myassert(exists != null);
- List<String> children = zk.getChildren(tasksListRoot, false);
- myassert(children.size() == data.length);
- boolean[] matched = new boolean[data.length];
- for (String child : children) {
- System.out.println(child);
- String childPath = tasksListRoot + "/" + child;
- Stat sTemp = zk.exists(childPath, false);
- byte[] tempData = zk.getData(tasksListRoot + "/" + child,
- false,
- sTemp);
- Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
- // check if it matches any of the data
- for (int i = 0; i < data.length; i++) {
- Map<String, Object> newData = (Map<String, Object>) data[i];
- if (!matched[i] && CommUtil.compareMaps(newData, map)) {
- matched[i] = true;
- break;
- }
- }
- }
- for (int i = 0; i < matched.length; i++) {
- myassert(matched[i]);
- }
-
- // try running again and make verify new node is not created
- Stat oldStat = zk.exists(tasksListRoot, false);
- System.out.println("oldStat=" + oldStat);
- zkSetup.setUpTasks(data);
- Stat newStat = zk.exists(tasksListRoot, false);
- System.out.println("newstat=" + newStat);
- myassert(oldStat.getMtime() == newStat.getMtime());
-
- // make change to task config and try running again and verify new
- // config is uploaded
- oldStat = zk.exists(tasksListRoot, false);
- System.out.println("oldStat=" + oldStat.getVersion());
- ((Map<String, String>) data[data.length - 1]).put("name", "changedname");
- zkSetup.setUpTasks(data);
- newStat = zk.exists(tasksListRoot, false);
- System.out.println("newstat=" + newStat.getVersion());
- System.out.println();
- myassert(oldStat.getMtime() != newStat.getMtime());
-
- // ensure version change is working
- zkSetup.setUpTasks("1.0.0.0", data);
- }
-
- private void myassert(boolean b) {
- if (!b) {
- throw new AssertionError();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/test/ZkQueueTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/test/ZkQueueTest.java b/s4-comm/src/main/java/io/s4/comm/test/ZkQueueTest.java
deleted file mode 100644
index ebfef29..0000000
--- a/s4-comm/src/main/java/io/s4/comm/test/ZkQueueTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.test;
-
-import io.s4.comm.zk.ZkQueue;
-
-import org.apache.zookeeper.KeeperException;
-
-public class ZkQueueTest {
- public static void main(String args[]) {
- ZkQueue q = new ZkQueue(args[0], "/app1");
-
- System.out.println("Input: " + args[0]);
- int i;
- Integer max = new Integer(args[1]);
-
- if (args[2].equals("p")) {
- System.out.println("Producer");
- for (i = 0; i < max; i++)
- try {
- q.produce(new Integer(10 + i));
- } catch (KeeperException e) {
-
- } catch (InterruptedException e) {
-
- }
- } else {
- System.out.println("Consumer");
-
- for (i = 0; i < max || true; i++) {
- try {
- Integer r = (Integer) q.consume();
- System.out.println("Item: " + r);
- } catch (KeeperException e) {
- e.printStackTrace();
- i--;
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/tools/TaskSetupApp.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/tools/TaskSetupApp.java b/s4-comm/src/main/java/io/s4/comm/tools/TaskSetupApp.java
deleted file mode 100644
index 7cdde01..0000000
--- a/s4-comm/src/main/java/io/s4/comm/tools/TaskSetupApp.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.tools;
-
-import io.s4.comm.util.ConfigUtils;
-import io.s4.comm.util.ConfigParser;
-import io.s4.comm.util.ConfigParser.Cluster;
-import io.s4.comm.util.ConfigParser.Config;
-import io.s4.comm.zk.ZkTaskSetup;
-
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class will set up initial tasks on the zookeeper USAGE: java AppTask
- * [clean] setup config.xml
- *
- * @author kishoreg
- *
- */
-public class TaskSetupApp {
- public static void main(String[] args) {
- String zkAddress = "";
- boolean clean = false;
- boolean setup = false;
- String setupXml = null;
- for (int i = 0; i < args.length; i++) {
- if (i == 0) {
- zkAddress = args[0];
- }
- if (args[i].equals("clean")) {
- clean = true;
- } else if (args[i].equals("setup")) {
- setup = true;
- } else if (i == args.length - 1) {
- setupXml = args[i];
- }
- }
- if (setupXml == null || !new File(setupXml).exists()) {
- printusage("Set up xml: " + setupXml + " does not exist");
- }
- if (!setup && !clean) {
- System.err.println("Invalid usage.");
- printusage("Must specify at least one of of clean, setup.");
- }
- doMain(zkAddress, clean, setup, setupXml);
- }
-
- private static void printusage(String message) {
- System.err.println(message);
- System.err.println("java TaskSetupApp <zk_address> [clean|setup] setup_config_xml");
- System.exit(1);
- }
-
- private static void doMain(String zkAddress, boolean clean, boolean setup, String setupXml) {
- ConfigParser parser = new ConfigParser();
- Config config = parser.parse(setupXml);
- for (Cluster cluster : config.getClusters()) {
- processCluster(clean, zkAddress, cluster, config.getVersion());
- }
- }
-
- private static void processCluster(boolean clean, String zkAddress, Cluster cluster, String version) {
- List<Map<String,String>> clusterInfo = ConfigUtils.readConfig(cluster, cluster.getName(), cluster.getType(), false);
- ZkTaskSetup zkSetup = new ZkTaskSetup(zkAddress, cluster.getName(), cluster.getType());
- if (clean) {
- zkSetup.cleanUp();
- }
-
- zkSetup.setUpTasks(version, clusterInfo.toArray());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/io/s4/comm/util/CommUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/io/s4/comm/util/CommUtil.java b/s4-comm/src/main/java/io/s4/comm/util/CommUtil.java
deleted file mode 100644
index 45f2e91..0000000
--- a/s4-comm/src/main/java/io/s4/comm/util/CommUtil.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.comm.util;
-
-import java.util.Map;
-
-public class CommUtil {
-
- public static boolean compareMaps(Map<String, Object> map1, Map<String, Object> map2) {
- boolean equals = true;
- if (map1.size() == map2.size()) {
- for (String key : map1.keySet()) {
- if (!(map2.containsKey(key) && map1.get(key)
- .equals(map2.get(key)))) {
- equals = false;
- break;
- }
- }
- } else {
- equals = false;
- }
- return equals;
- }
-
-}