You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/02 09:26:00 UTC

git commit: Inline clusters configuration for Zookeeper server start - also added a default test mode, through option -t

Updated Branches:
  refs/heads/S4-60 [created] e438fbff9


Inline clusters configuration for Zookeeper server start
- also added a default test mode, through option -t


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/e438fbff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/e438fbff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/e438fbff

Branch: refs/heads/S4-60
Commit: e438fbff99a8a15ae9c3d8184a32d51f256e30bd
Parents: b712f9c
Author: Matthieu Morel <mm...@apache.org>
Authored: Sun Jul 1 12:41:00 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Mon Jul 2 11:25:18 2012 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/tools/Tools.java   |    2 +-
 .../main/java/org/apache/s4/tools/ZKServer.java    |   68 +++++++++++++++
 2 files changed, 69 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e438fbff/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index 733c7ad..4391a49 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -97,7 +97,7 @@ public class Tools {
             }
             jc.parse(cliArgs);
         } catch (Exception e) {
-            JCommander.getConsole().println("Cannot parse arguments: " + e.getMessage());
+            JCommander.getConsole().println("Cannot parse arguments: " + e.getClass() + " -> " + e.getMessage());
             jc.usage();
             System.exit(1);
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e438fbff/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
index 0842f07..687e8cd 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
@@ -1,19 +1,27 @@
 package org.apache.s4.tools;
 
 import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.I0Itec.zkclient.IDefaultNameSpace;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.commons.io.FileUtils;
+import org.apache.s4.comm.tools.TaskSetup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 
 public class ZKServer {
 
+    private static final String TEST_MODE_CLUSTER_CONF_2 = "c=testCluster2:flp=13000:nbTasks=1";
+    private static final String TEST_MODE_CLUSTER_CONF_1 = "c=testCluster1:flp=12000:nbTasks=1";
     static Logger logger = LoggerFactory.getLogger(ZKServer.class);
 
     public static void main(String[] args) {
@@ -39,6 +47,28 @@ public class ZKServer {
 
             ZkServer zkServer = new ZkServer(zkArgs.dataDir, zkArgs.logDir, defaultNameSpace, zkArgs.zkPort);
             zkServer.start();
+
+            logger.info("Zookeeper server started on port [{}]", zkArgs.zkPort);
+
+            // now upload cluster configs if specified or if using test mode
+            List<ClusterConfig> clusterConfigs = zkArgs.clusterConfigs;
+            if (zkArgs.testMode && (clusterConfigs == null)) {
+                logger.info("Initializing test mode with default clusters configurations");
+                clusterConfigs = new ArrayList<ZKServer.ClusterConfig>() {
+                    {
+                        add(new ClusterConfig(TEST_MODE_CLUSTER_CONF_1));
+                        add(new ClusterConfig(TEST_MODE_CLUSTER_CONF_2));
+                    }
+                };
+            }
+            for (ClusterConfig config : clusterConfigs) {
+                TaskSetup taskSetup = new TaskSetup("localhost:" + zkArgs.zkPort);
+                taskSetup.clean(config.clusterName);
+                taskSetup.setup(config.clusterName, config.nbTasks, config.firstListeningPort);
+                logger.info("Defined S4 cluster [{}] with [{}] tasks with first listening port [{}]", new String[] {
+                        config.clusterName, String.valueOf(config.nbTasks), String.valueOf(config.firstListeningPort) });
+            }
+
         } catch (Exception e) {
             logger.error("Cannot initialize zookeeper with specified configuration", e);
         }
@@ -61,6 +91,44 @@ public class ZKServer {
         String logDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
                 + "zookeeper" + File.separator + "log").getAbsolutePath();
 
+        @Parameter(names = { "-t", "-testMode" }, description = "Launch Zookeeper instance and load a default cluster configuration for easy testing (2 clusters with following configs: {"
+                + TEST_MODE_CLUSTER_CONF_1 + "} and {" + TEST_MODE_CLUSTER_CONF_2 + "}")
+        boolean testMode = false;
+
+        @Parameter(names = "-clusters", description = "Inline clusters configuration, comma-separated list of curly-braces enclosed cluster definitions with format: {c=<cluster name>:flp=<first listening port for this cluster>:nbTasks=<number of tasks>} (Overrides default configuration for test mode)", converter = ClusterConfigsConverter.class)
+        List<ClusterConfig> clusterConfigs;
+
+    }
+
+    public static class ClusterConfigsConverter implements IStringConverter<ClusterConfig> {
+
+        @Override
+        public ClusterConfig convert(String arg) {
+            Pattern clusterConfigPattern = Pattern.compile("\\{(c=\\w+[:]flp=\\d+[:]nbTasks=\\d+)\\}");
+            logger.info("processing cluster configuration {}", arg);
+            Matcher configMatcher = clusterConfigPattern.matcher(arg);
+            if (!configMatcher.find()) {
+                throw new IllegalArgumentException("Cannot understand parameter " + arg);
+            }
+            String clusterConfigString = configMatcher.group(1);
+            return new ClusterConfig(clusterConfigString);
+        }
+    }
+
+    public static class ClusterConfig {
+
+        public ClusterConfig(String config) {
+            String[] split = config.split(":");
+            this.clusterName = split[0].split("=")[1];
+            this.firstListeningPort = Integer.valueOf(split[1].split("=")[1]);
+            this.nbTasks = Integer.valueOf(split[2].split("=")[1]);
+
+        }
+
+        String clusterName;
+        int firstListeningPort;
+        int nbTasks;
+
     }
 
 }