You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2012/12/20 19:06:49 UTC
[14/19] git commit: Bug fixes and made the walk through
Bug fixes and made the walk through
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/9fddde72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/9fddde72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/9fddde72
Branch: refs/heads/S4-110
Commit: 9fddde7216e4771fa1cd52ecfb6258a656be2426
Parents: 337baca
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Tue Nov 27 23:28:07 2012 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Tue Nov 27 23:28:07 2012 -0800
----------------------------------------------------------------------
.../s4/comm/topology/AssignmentFromHelix.java | 1 -
subprojects/s4-core/src/main/resources/logback.xml | 2 +-
.../main/java/org/apache/s4/tools/DeployApp.java | 3 +-
.../org/apache/s4/tools/GenericEventAdapter.java | 63 +++++++++++---
.../src/main/java/org/apache/s4/tools/Tools.java | 6 +-
5 files changed, 55 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9fddde72/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
index fde198c..7b39701 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
@@ -62,7 +62,6 @@ public class AssignmentFromHelix implements Assignment
{
this.taskStateModelFactory = new S4StateModelFactory();
// this.appStateModelFactory = appStateModelFactory;
- System.out.println("here i am");
this.clusterName = clusterName;
this.zookeeperAddress = zookeeperAddress;
machineId = "localhost";
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9fddde72/subprojects/s4-core/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/resources/logback.xml b/subprojects/s4-core/src/main/resources/logback.xml
index ea8c85a..6b246ee 100644
--- a/subprojects/s4-core/src/main/resources/logback.xml
+++ b/subprojects/s4-core/src/main/resources/logback.xml
@@ -8,7 +8,7 @@
</encoder>
</appender>
- <root level="debug">
+ <root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9fddde72/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
index 854177a..159f3f8 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
@@ -1,5 +1,6 @@
package org.apache.s4.tools;
+import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -30,7 +31,7 @@ public class DeployApp extends S4ArgsBase
ConfigScopeBuilder builder = new ConfigScopeBuilder();
ConfigScope scope = builder.forCluster(deployArgs.clusterName).forResource(deployArgs.appName).build();
Map<String, String> properties = new HashMap<String, String>();
- properties.put(DistributedDeploymentManager.S4R_URI, deployArgs.s4rPath);
+ properties.put(DistributedDeploymentManager.S4R_URI, new File(deployArgs.s4rPath).toURI().toString());
admin.setConfig(scope, properties);
IdealState is = new IdealState(deployArgs.appName);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9fddde72/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
index 88acb4f..b39bbc1 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
@@ -1,47 +1,82 @@
package org.apache.s4.tools;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.IdealState;
import org.apache.s4.base.Event;
import org.apache.s4.base.EventMessage;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.ClusterFromHelix;
+import org.apache.s4.tools.DeployApp.DeployAppArgs;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
public class GenericEventAdapter
{
-
+
public static void main(String[] args)
{
+ AdapterArgs adapterArgs = new AdapterArgs();
+
+ Tools.parseArgs(adapterArgs, args);
try
{
- String clusterName = "cluster1";
String instanceName = "adapter";
- String zkAddr= "localhost:2181";
- HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.SPECTATOR, zkAddr);
- ClusterFromHelix cluster = new ClusterFromHelix("cluster1","localhost:2181",30,60);
+ HelixManager manager = HelixManagerFactory.getZKHelixManager(
+ adapterArgs.clusterName, instanceName, InstanceType.SPECTATOR,
+ adapterArgs.zkConnectionString);
+ ClusterFromHelix cluster = new ClusterFromHelix(adapterArgs.clusterName,
+ adapterArgs.zkConnectionString, 30, 60);
manager.connect();
manager.addExternalViewChangeListener(cluster);
-
-
+
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ IdealState idealstate = helixDataAccessor.getProperty(keyBuilder
+ .idealStates(adapterArgs.streamName));
TCPEmitter emitter = new TCPEmitter(cluster, 1000);
- while(true){
- int partitionId = ((int)(Math.random()*1000))%4;
+ while (true)
+ {
+ int partitionId = ((int) (Math.random() * 1000))
+ % idealstate.getNumPartitions();
Event event = new Event();
- event.put("name", String.class, "Hello world to partition:"+ partitionId);
+ event.put(adapterArgs.streamName, String.class,
+ "Hello world to partition:" + partitionId);
KryoSerDeser serializer = new KryoSerDeser();
- EventMessage message = new EventMessage("-1", "names", serializer.serialize(event));
- System.out.println("Sending event to partition");
+ EventMessage message = new EventMessage("-1", adapterArgs.streamName,
+ serializer.serialize(event));
+ System.out.println("Sending event to partition:"+partitionId);
emitter.send(partitionId, message);
Thread.sleep(1000);
}
} catch (Exception e)
{
- // TODO Auto-generated catch block
e.printStackTrace();
}
-
+
+ }
+
+ @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+ static class AdapterArgs extends S4ArgsBase
+ {
+
+ @Parameter(names = "-zk", description = "ZooKeeper connection string")
+ String zkConnectionString = "localhost:2181";
+
+ @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+ String clusterName;
+
+ @Parameter(names = { "-s", "-streamName" }, description = "Stream Name where the event will be sent to", required = true)
+ String streamName;
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9fddde72/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index dbd63b9..67bf9d3 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -38,9 +38,9 @@ public class Tools {
static Logger logger = LoggerFactory.getLogger(Tools.class);
- enum Task {
- deploy(Deploy.class), node(Main.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(null), newApp(
- CreateApp.class), s4r(Package.class), status(Status.class);
+ enum Task { deployApp(DeployApp.class),
+ deploy(Deploy.class), node(Main.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(GenericEventAdapter.class), newApp(
+ CreateApp.class), s4r(Package.class), status(Status.class),createTask(CreateTask.class);
Class<?> target;