You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2012/12/20 19:06:49 UTC

[14/19] git commit: Bug fixes and made the walk through

Bug fixes and made the walk through


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/9fddde72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/9fddde72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/9fddde72

Branch: refs/heads/S4-110
Commit: 9fddde7216e4771fa1cd52ecfb6258a656be2426
Parents: 337baca
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Tue Nov 27 23:28:07 2012 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Tue Nov 27 23:28:07 2012 -0800

----------------------------------------------------------------------
 .../s4/comm/topology/AssignmentFromHelix.java      |    1 -
 subprojects/s4-core/src/main/resources/logback.xml |    2 +-
 .../main/java/org/apache/s4/tools/DeployApp.java   |    3 +-
 .../org/apache/s4/tools/GenericEventAdapter.java   |   63 +++++++++++---
 .../src/main/java/org/apache/s4/tools/Tools.java   |    6 +-
 5 files changed, 55 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9fddde72/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
index fde198c..7b39701 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
@@ -62,7 +62,6 @@ public class AssignmentFromHelix implements Assignment
   {
     this.taskStateModelFactory = new S4StateModelFactory();
 //    this.appStateModelFactory = appStateModelFactory;
-    System.out.println("here i am");
     this.clusterName = clusterName;
     this.zookeeperAddress = zookeeperAddress;
     machineId = "localhost";

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9fddde72/subprojects/s4-core/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/resources/logback.xml b/subprojects/s4-core/src/main/resources/logback.xml
index ea8c85a..6b246ee 100644
--- a/subprojects/s4-core/src/main/resources/logback.xml
+++ b/subprojects/s4-core/src/main/resources/logback.xml
@@ -8,7 +8,7 @@
     </encoder>
   </appender>
 
-  <root level="debug">
+  <root level="info">
     <appender-ref ref="STDOUT" />
   </root>
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9fddde72/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
index 854177a..159f3f8 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
@@ -1,5 +1,6 @@
 package org.apache.s4.tools;
 
+import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -30,7 +31,7 @@ public class DeployApp extends S4ArgsBase
     ConfigScopeBuilder builder = new ConfigScopeBuilder();
     ConfigScope scope = builder.forCluster(deployArgs.clusterName).forResource(deployArgs.appName).build();
     Map<String, String> properties = new HashMap<String, String>();
-    properties.put(DistributedDeploymentManager.S4R_URI, deployArgs.s4rPath);
+    properties.put(DistributedDeploymentManager.S4R_URI, new File(deployArgs.s4rPath).toURI().toString());
     admin.setConfig(scope, properties);
     
     IdealState is = new IdealState(deployArgs.appName);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9fddde72/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
index 88acb4f..b39bbc1 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
@@ -1,47 +1,82 @@
 package org.apache.s4.tools;
 
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.IdealState;
 import org.apache.s4.base.Event;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.topology.ClusterFromHelix;
+import org.apache.s4.tools.DeployApp.DeployAppArgs;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
 
 public class GenericEventAdapter
 {
-  
+
   public static void main(String[] args)
   {
+    AdapterArgs adapterArgs = new AdapterArgs();
+
+    Tools.parseArgs(adapterArgs, args);
     try
     {
-      String clusterName = "cluster1";
       String instanceName = "adapter";
-      String zkAddr= "localhost:2181";
-      HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.SPECTATOR, zkAddr);
-      ClusterFromHelix cluster = new ClusterFromHelix("cluster1","localhost:2181",30,60);
+      HelixManager manager = HelixManagerFactory.getZKHelixManager(
+          adapterArgs.clusterName, instanceName, InstanceType.SPECTATOR,
+          adapterArgs.zkConnectionString);
+      ClusterFromHelix cluster = new ClusterFromHelix(adapterArgs.clusterName,
+          adapterArgs.zkConnectionString, 30, 60);
       manager.connect();
       manager.addExternalViewChangeListener(cluster);
-      
-      
+
+      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+      Builder keyBuilder = helixDataAccessor.keyBuilder();
+      IdealState idealstate = helixDataAccessor.getProperty(keyBuilder
+          .idealStates(adapterArgs.streamName));
       TCPEmitter emitter = new TCPEmitter(cluster, 1000);
-      while(true){
-        int partitionId = ((int)(Math.random()*1000))%4;
+      while (true)
+      {
+        int partitionId = ((int) (Math.random() * 1000))
+            % idealstate.getNumPartitions();
         Event event = new Event();
-        event.put("name", String.class, "Hello world to partition:"+ partitionId);
+        event.put(adapterArgs.streamName, String.class,
+            "Hello world to partition:" + partitionId);
         KryoSerDeser serializer = new KryoSerDeser();
-        EventMessage message = new EventMessage("-1", "names", serializer.serialize(event));
-        System.out.println("Sending event to partition");
+        EventMessage message = new EventMessage("-1", adapterArgs.streamName,
+            serializer.serialize(event));
+        System.out.println("Sending event to partition:"+partitionId);
         emitter.send(partitionId, message);
         Thread.sleep(1000);
       }
     } catch (Exception e)
     {
-      // TODO Auto-generated catch block
       e.printStackTrace();
     }
-    
+
+  }
+
+  @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+  static class AdapterArgs extends S4ArgsBase
+  {
+
+    @Parameter(names = "-zk", description = "ZooKeeper connection string")
+    String zkConnectionString = "localhost:2181";
+
+    @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+    String clusterName;
+
+    @Parameter(names = { "-s", "-streamName" }, description = "Stream Name where the event will be sent to", required = true)
+    String streamName;
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9fddde72/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index dbd63b9..67bf9d3 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -38,9 +38,9 @@ public class Tools {
 
     static Logger logger = LoggerFactory.getLogger(Tools.class);
 
-    enum Task {
-        deploy(Deploy.class), node(Main.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(null), newApp(
-                CreateApp.class), s4r(Package.class), status(Status.class);
+    enum Task { deployApp(DeployApp.class),
+        deploy(Deploy.class), node(Main.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(GenericEventAdapter.class), newApp(
+                CreateApp.class), s4r(Package.class), status(Status.class),createTask(CreateTask.class);
 
         Class<?> target;