You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2012/12/20 19:06:49 UTC
[17/19] git commit: Adding a standalone adapter
Adding a standalone adapter
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/5e797c8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/5e797c8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/5e797c8a
Branch: refs/heads/S4-110
Commit: 5e797c8a57baa4f2a4a0e117bfebe0d9851f36db
Parents: f8cec68
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Mon Nov 26 15:57:00 2012 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Mon Nov 26 15:57:00 2012 -0800
----------------------------------------------------------------------
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 6 +-
.../apache/s4/comm/topology/ClusterFromHelix.java | 3 +
.../src/main/java/org/apache/s4/core/Server.java | 7 ++-
.../org/apache/s4/tools/GenericEventAdapter.java | 47 +++++++++++++++
4 files changed, 59 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/5e797c8a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 719c90c..5d59be7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ClusterChangeListener;
import org.apache.s4.comm.topology.ClusterNode;
@@ -86,7 +87,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
private final Lock lock;
@Inject
- SerializerDeserializer serDeser;
+ SerializerDeserializer serDeser= new KryoSerDeser();
@Inject
public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout)
@@ -166,7 +167,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
}
}
- Channel c = nodeChannelMap.get(partitionId);
+ Channel c = nodeChannelMap.get(config);
if (c == null)
return;
@@ -258,7 +259,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
@Override
public void onChange() {
- // TODO Auto-generated method stub
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/5e797c8a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 7257908..67053a9 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -81,10 +81,13 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
NotificationContext changeContext) {
lock.lock();
try {
+ logger.info("Start:Processing change in cluster topology");
super.onExternalViewChange(externalViewList, changeContext);
for(ClusterChangeListener listener:listeners){
listener.onChange();
}
+ logger.info("End:Processing change in cluster topology");
+
} catch (Exception e) {
logger.error("", e);
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/5e797c8a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 23bdf51..666b7e7 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -28,12 +28,14 @@ import org.I0Itec.zkclient.ZkClient;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.s4.base.util.S4RLoader;
import org.apache.s4.base.util.S4RLoaderFactory;
import org.apache.s4.comm.helix.S4StateModelFactory;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromHelix;
import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.deploy.AppStateModelFactory;
import org.apache.s4.deploy.DeploymentManager;
@@ -80,6 +82,9 @@ public class Server {
@Inject
private AppStateModelFactory appStateModelFactory;
+ @Inject
+ private Cluster cluster;
+
/**
*
*/
@@ -123,12 +128,12 @@ public class Server {
{
helixManager = HelixManagerFactory.getZKHelixManager(clusterName,
instanceName, InstanceType.PARTICIPANT, zookeeperAddress);
-
helixManager.getStateMachineEngine().registerStateModelFactory(
"LeaderStandby", taskStateModelFactory);
helixManager.getStateMachineEngine().registerStateModelFactory(
"OnlineOffline", appStateModelFactory);
helixManager.connect();
+ helixManager.addExternalViewChangeListener((RoutingTableProvider)cluster);
} catch (Exception e)
{
// TODO Auto-generated catch block
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/5e797c8a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
new file mode 100644
index 0000000..88acb4f
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
@@ -0,0 +1,47 @@
+package org.apache.s4.tools;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.topology.ClusterFromHelix;
+
+public class GenericEventAdapter
+{
+
+ public static void main(String[] args)
+ {
+ try
+ {
+ String clusterName = "cluster1";
+ String instanceName = "adapter";
+ String zkAddr= "localhost:2181";
+ HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.SPECTATOR, zkAddr);
+ ClusterFromHelix cluster = new ClusterFromHelix("cluster1","localhost:2181",30,60);
+ manager.connect();
+ manager.addExternalViewChangeListener(cluster);
+
+
+ TCPEmitter emitter = new TCPEmitter(cluster, 1000);
+ while(true){
+ int partitionId = ((int)(Math.random()*1000))%4;
+ Event event = new Event();
+ event.put("name", String.class, "Hello world to partition:"+ partitionId);
+ KryoSerDeser serializer = new KryoSerDeser();
+ EventMessage message = new EventMessage("-1", "names", serializer.serialize(event));
+ System.out.println("Sending event to partition");
+ emitter.send(partitionId, message);
+ Thread.sleep(1000);
+ }
+ } catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+
+}