You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2012/12/20 19:06:49 UTC

[17/19] git commit: Adding a standalone adapter

Adding a standalone adapter


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/5e797c8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/5e797c8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/5e797c8a

Branch: refs/heads/S4-110
Commit: 5e797c8a57baa4f2a4a0e117bfebe0d9851f36db
Parents: f8cec68
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Mon Nov 26 15:57:00 2012 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Mon Nov 26 15:57:00 2012 -0800

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |    6 +-
 .../apache/s4/comm/topology/ClusterFromHelix.java  |    3 +
 .../src/main/java/org/apache/s4/core/Server.java   |    7 ++-
 .../org/apache/s4/tools/GenericEventAdapter.java   |   47 +++++++++++++++
 4 files changed, 59 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/5e797c8a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 719c90c..5d59be7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterChangeListener;
 import org.apache.s4.comm.topology.ClusterNode;
@@ -86,7 +87,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 	private final Lock lock;
 
 	@Inject
-	SerializerDeserializer serDeser;
+	SerializerDeserializer serDeser= new KryoSerDeser();
 
 	@Inject
 	public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout)
@@ -166,7 +167,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 			}
 		}
 
-		Channel c = nodeChannelMap.get(partitionId);
+		Channel c = nodeChannelMap.get(config);
 		if (c == null)
 			return;
 
@@ -258,7 +259,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
 	@Override
 	public void onChange() {
-		// TODO Auto-generated method stub
 		
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/5e797c8a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 7257908..67053a9 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -81,10 +81,13 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
 			NotificationContext changeContext) {
 		lock.lock();
 		try {
+		  logger.info("Start:Processing change in cluster topology");
 			super.onExternalViewChange(externalViewList, changeContext);
 			for(ClusterChangeListener listener:listeners){
 			  listener.onChange();
 			}
+	    logger.info("End:Processing change in cluster topology");
+
 		} catch (Exception e) {
 			logger.error("", e);
 		} finally {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/5e797c8a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 23bdf51..666b7e7 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -28,12 +28,14 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.spectator.RoutingTableProvider;
 import org.apache.s4.base.util.S4RLoader;
 import org.apache.s4.base.util.S4RLoaderFactory;
 import org.apache.s4.comm.helix.S4StateModelFactory;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromHelix;
 import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
 import org.apache.s4.deploy.AppStateModelFactory;
 import org.apache.s4.deploy.DeploymentManager;
@@ -80,6 +82,9 @@ public class Server {
     @Inject
     private AppStateModelFactory appStateModelFactory;
     
+    @Inject
+    private Cluster cluster;
+    
     /**
      *
      */
@@ -123,12 +128,12 @@ public class Server {
       {
         helixManager = HelixManagerFactory.getZKHelixManager(clusterName,
             instanceName, InstanceType.PARTICIPANT, zookeeperAddress);
-     
         helixManager.getStateMachineEngine().registerStateModelFactory(
           "LeaderStandby", taskStateModelFactory);
         helixManager.getStateMachineEngine().registerStateModelFactory(
           "OnlineOffline", appStateModelFactory);
         helixManager.connect();  
+        helixManager.addExternalViewChangeListener((RoutingTableProvider)cluster);
       } catch (Exception e)
       {
         // TODO Auto-generated catch block

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/5e797c8a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
new file mode 100644
index 0000000..88acb4f
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
@@ -0,0 +1,47 @@
+package org.apache.s4.tools;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.topology.ClusterFromHelix;
+
+public class GenericEventAdapter
+{
+  
+  public static void main(String[] args)
+  {
+    try
+    {
+      String clusterName = "cluster1";
+      String instanceName = "adapter";
+      String zkAddr= "localhost:2181";
+      HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.SPECTATOR, zkAddr);
+      ClusterFromHelix cluster = new ClusterFromHelix("cluster1","localhost:2181",30,60);
+      manager.connect();
+      manager.addExternalViewChangeListener(cluster);
+      
+      
+      TCPEmitter emitter = new TCPEmitter(cluster, 1000);
+      while(true){
+        int partitionId = ((int)(Math.random()*1000))%4;
+        Event event = new Event();
+        event.put("name", String.class, "Hello world to partition:"+ partitionId);
+        KryoSerDeser serializer = new KryoSerDeser();
+        EventMessage message = new EventMessage("-1", "names", serializer.serialize(event));
+        System.out.println("Sending event to partition");
+        emitter.send(partitionId, message);
+        Thread.sleep(1000);
+      }
+    } catch (Exception e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    
+  }
+
+}